Implantar um pipeline

Neste documento, explicamos em detalhes como o Dataflow implanta e executa um pipeline e abrange tópicos avançados, como otimização e balanceamento de carga. Se você estiver procurando um guia passo a passo sobre como criar e implantar seu primeiro pipeline, use os guias de início rápido do Dataflow para Java , Python ou modelos.

Depois de construir e testar o pipeline do Apache Beam, é possível usar o serviço gerenciado do Dataflow para implantá-lo e executá-lo. No serviço do Dataflow, o código do pipeline torna-se um job do Dataflow.

O serviço do Dataflow gerencia totalmente serviços do Google Cloud como o Compute Engine e o Cloud Storage para executar um job do Dataflow, ativando e eliminando automaticamente os recursos necessários. Com o serviço do Dataflow, é possível visualizar o job com as ferramentas Interface de monitoramento do Dataflow e Interface de linha de comando do Dataflow.

Definir parâmetros de execução no código do pipeline permite controlar alguns aspectos de como o serviço do Dataflow executa o job. Por exemplo, os parâmetros de execução especificam se as etapas do pipeline são executadas em máquinas virtuais de worker, no back-end do serviço Dataflow ou localmente.

Além de gerenciar recursos do Google Cloud, o serviço do Dataflow executa e otimiza automaticamente muitos aspectos do processamento paralelo distribuído. Isso inclui o seguinte:

  • Carregamento em paralelo e distribuição O Dataflow particiona automaticamente os dados e distribui o código do trabalhador nas instâncias do Compute Engine para processamento paralelo. Para mais informações, consulte paralelismo e distribuição.
  • Otimização. O Dataflow usa o código do pipeline para criar um gráfico de execução que representa as PCollections e transformações, e otimiza esse gráfico para ter o desempenho e o uso de recursos mais eficientes. Além disso, otimiza de modo automático as operações de custo potencialmente alto, como agregações de dados. Para mais informações, consulte Otimização de fusão e Otimização de combinação.
  • Recursos de ajuste automático. O serviço Dataflow inclui vários recursos que fornecem ajuste rápido de alocação de recursos e particionamento de dados, como escalonamento automático horizontal, escalonamento automático vertical e Reequilíbrio dinâmico de trabalho. Esses recursos ajudam o serviço do Dataflow a executar o job da forma mais rápida e eficiente possível.

Ciclo de vida do pipeline: do código do pipeline ao job do Dataflow

Quando você executa o pipeline do Dataflow, o Dataflow cria um gráfico de execução a partir do código que constrói o objeto Pipeline, incluindo todas as transformações e as respectivas funções de processamento, como DoFn. Essa fase é chamada de tempo de construção do gráfico e é executada localmente no computador em que o pipeline é executado.

Durante a construção do gráfico, o Apache Beam executa localmente o código a partir do ponto de entrada principal do código do pipeline, interrompendo as chamadas para uma etapa de origem, coletor ou transformação e transformando essas chamadas em nós do gráfico. Como consequência, uma parte do código em um ponto de entrada do pipeline (método main() do Java ou o nível superior de um script Python) é executada localmente na máquina que executa o pipeline, enquanto o mesmo código declarado em um método de um objeto DoFn é executado nos workers do Dataflow.

Durante a construção do gráfico, o Apache Beam também valida que os recursos referenciados pelo pipeline (como buckets do Cloud Storage, tabelas do BigQuery e tópicos ou assinaturas do Pub/Sub) realmente existem e são acessíveis. A validação é feita por meio de chamadas de API padrão para os respectivos serviços. Portanto, é essencial que a conta de usuário usada para executar um pipeline tenha conectividade adequada com os serviços necessários e esteja autorizada a chamar as APIs. Antes de enviar o pipeline para o serviço Dataflow, o Apache Beam também verifica se há outros erros e garante que o gráfico do pipeline não contenha operações ilegais.

O gráfico de execução é convertido para o formato JSON e transmitido para o endpoint do serviço do Dataflow.

Observação: a construção do gráfico também acontece ao executar o pipeline no local, mas o gráfico não é convertido em JSON nem transmitido para o serviço. Em vez disso, é executado localmente na mesma máquina na qual o programa Dataflow foi iniciado. Para mais detalhes, consulte a documentação sobre configuração para execução local.

Em seguida, o serviço do Dataflow valida o gráfico de execução em JSON. Quando o gráfico é validado, torna-se um job no serviço Dataflow. Use a Interface de monitoramento do Dataflow para ver o job, o gráfico de execução, o status e as informações de registro.

Java

O serviço Dataflow envia uma resposta para a máquina que executa o programa do Dataflow. Essa resposta é encapsulada no objeto DataflowPipelineJob, que contém o jobId do job do Dataflow. Use o jobId para monitorar, rastrear e solucionar problemas do job com a Interface de monitoramento do Dataflow e a Interface de linha de comando do Dataflow. Consulte a referência da API de DataflowPipelineJob para mais informações.

Python

O serviço Dataflow envia uma resposta para a máquina que executa o programa do Dataflow. Essa resposta é encapsulada no objeto DataflowPipelineResult, que contém o job_id do job do Dataflow. Use o job_id para monitorar, rastrear e solucionar problemas do job com a Interface de monitoramento do Dataflow e a Interface de linha de comando do Dataflow.

Gráfico de execução

O Dataflow cria um gráfico de etapas que representa o pipeline, com base nas transformações e nos dados usados na construção do objeto Pipeline. Esse é o gráfico de execução do pipeline.

O exemplo de WordCount incluído nos SDKs do Apache Beam contém uma série de transformações para ler, extrair, contar, formatar e gravar palavras específicas em um conjunto de textos, junto com uma contagem de ocorrências de cada palavra. O seguinte diagrama mostra como as transformações do pipeline WordCount são expandidas em um gráfico de execução:

As transformações no programa de exemplo WordCount são expandidas em um gráfico de execução
              de etapas a serem executadas pelo serviço Dataflow.
Figura 1: gráfico de execução do exemplo WordCount

O gráfico de execução geralmente difere da ordem em que as transformações foram especificadas quando o pipeline foi construído. Isso ocorre porque o serviço Dataflow faz várias otimizações e fusões no gráfico de execução antes de executar recursos de nuvem gerenciados. O serviço Dataflow respeita as dependências de dados na execução do pipeline. No entanto, as etapas sem dependências de dados entre si podem ser executadas em qualquer ordem.

Selecione o job na Interface de monitoramento do Dataflow para ver o gráfico de execução não otimizado que o Dataflow gerou para o pipeline. Para mais informações sobre como visualizar jobs, consulte Como usar a interface de monitoramento do Dataflow.

Carregamento em paralelo e distribuição

O serviço do Dataflow carrega em paralelo e distribui automaticamente a lógica de processamento no pipeline para os workers que você alocou para executar o job. O Dataflow usa as abstrações do modelo de programação para representar funções de processamento paralelas. Por exemplo, as transformações ParDo fazem o Dataflow distribuir automaticamente o código de processamento (representado pelas DoFns) para que vários workers sejam executados em paralelo.

Como estruturar o código de usuário

Pense no código DoFn como pequenas entidades independentes: várias instâncias podem estar em execução em diferentes máquinas, sem que haja conhecimento umas das outras. Assim, as funções puras, aquelas que não dependem do estado oculto ou externo, são deterministas e não apresentam efeitos colaterais observáveis, são o código ideal para a natureza paralela e distribuída de DoFns.

O modelo de função pura não é estritamente rígido. No entanto, as informações de estado ou os dados de inicialização externos podem ser válidos para DoFn e outros objetos de função, desde que o código não dependa de coisas que o serviço do Dataflow não garante. Ao estruturar transformações ParDo e criar DoFns, lembre-se das seguintes diretrizes:

  • O Dataflow garante que todos os elementos da PCollection de entrada sejam processados por uma instância de DoFn apenas uma vez.
  • O serviço do Dataflow não garante a quantidade de vezes que um DoFn será invocado.
  • O serviço Dataflow não garante exatamente como os elementos distribuídos são agrupados, ou seja, não garante quais elementos, se houver, serão processados juntos.
  • O serviço do Dataflow não garante o número exato de instâncias de DoFn que serão criadas no decurso de um pipeline.
  • O serviço do Dataflow é tolerante a falhas e pode tentar novamente o código várias vezes em caso de problemas no worker. Além disso, o serviço do Dataflow pode criar cópias de backup do código e ter problemas com efeitos colaterais manuais, por exemplo, se o código depender de arquivos temporários ou criá-los com nomes não exclusivos.
  • O serviço do Dataflow serializa o processamento de elemento por instância de DoFn. Embora o código não precise ser estritamente seguro para linhas de execução, os estados compartilhados entre várias instâncias de DoFn precisam ser.

Consulte os requisitos para funções fornecidas pelo usuário (em inglês) na documentação do modelo de programação para mais informações sobre como criar o código de usuário.

Tratamento de erros e exceções

O pipeline pode lançar exceções durante o processamento de dados. Alguns desses erros são transitórios, por exemplo, dificuldade temporária em acessar um serviço externo, mas outros são permanentes, como erros causados pela entrada de dados corruptos ou não analisáveis, ou ponteiros nulos durante a computação.

O Dataflow processa elementos em pacotes arbitrários e repete o pacote completo quando um erro é gerado para qualquer elemento nesse pacote. Na execução no modo de lote, os pacotes que incluem um item com falha são repetidos quatro vezes. A falha do pipeline será total quando um único pacote falhar quatro vezes. Na execução no modo de streaming, um pacote que inclui um item com falha é repetido indefinidamente, o que pode causar a parada permanente do canal.

Erros de worker de inicialização, como falha na instalação de pacotes nos workers, são temporários, o que resulta em novas tentativas indefinidamente e pode causar a parada permanente do pipeline.

Otimização de fusão

Depois que o formato JSON do gráfico de execução do pipeline é validado, o serviço do Dataflow pode modificar o gráfico para fazer otimizações. Essas otimizações podem incluir a fusão de várias etapas ou transformações do gráfico de execução do canal em etapas únicas. A fusão de etapas evita que o serviço do Dataflow precise materializar cada PCollection intermediária no pipeline, o que pode custar caro em termos de memória e processamento.

Todas as transformações especificadas na construção do pipeline são executadas no serviço, mas elas podem ser executadas em uma ordem diferente ou como parte de uma transformação fusionada maior para garantir a execução mais eficiente do pipeline. O serviço do Dataflow respeita as dependências de dados entre as etapas no gráfico de execução. No entanto, em outras situações, as etapas podem ser executadas em qualquer ordem.

Exemplo de fusão

O diagrama a seguir mostra como o gráfico de execução do exemplo WordCount incluído no SDK do Apache Beam para Java pode ser otimizado e fusionado pelo serviço do Dataflow para uma execução eficiente:

O gráfico de execução do programa de exemplo WordCount otimizado e com etapas mescladas
              pelo serviço Dataflow.
Figura 2: gráfico de execução otimizado do exemplo WordCount

Como evitar a fusão

Em alguns casos no pipeline, é recomendável evitar que o serviço do Dataflow faça otimizações de fusão. São casos em que o serviço do Dataflow deduz incorretamente o meio ideal de fusionar operações no pipeline, limitando a capacidade do serviço do Dataflow de usar todos os workers disponíveis.

Por exemplo, um caso em que a fusão pode limitar a capacidade do Dataflow de otimizar o uso de workers é em uma ParDo de "alta distribuição de dados". Nessa operação, pode haver uma coleção de entrada com um número relativamente baixo de elementos, mas a ParDo produz uma saída com centenas ou milhares de elementos a mais, seguida por outra ParDo. Se o serviço do Dataflow fusionar essas operações ParDo em conjunto, o paralelismo nesta etapa será limitado a, no máximo, o número de itens da coleção de entrada, mesmo que a PCollection intermediária contenha muitos outros elementos.

Para evitar essa fusão, inclua no pipeline uma operação que obrigue o serviço do Dataflow a materializar a PCollection intermediária. Use uma das seguintes operações:

  • Insira uma GroupByKey e desagrupe depois da primeira ParDo. O serviço do Dataflow nunca faz a fusão de operações ParDo em uma agregação.
  • Transmita a PCollection intermediária como uma entrada secundária (em inglês) para outra ParDo. O serviço do Dataflow sempre materializa entradas secundárias.
  • Você pode inserir uma etapa Reshuffle. Reshuffle evita a fusão, verifica os dados e executa a eliminação de duplicação de registros. A reorganização é compatível com o Dataflow, mesmo que esteja marcada como obsoleta na documentação do Apache Beam.

Como monitorar a fusão

Para acessar o gráfico otimizado e os estágios fundidos, chame project.locations.jobs.get ou execute o seguinte comando gcloud:

  gcloud dataflow jobs describe --full $JOB_ID --format json

Os estágios de fusão são descritos no objeto ExecutionStageSummary dentro da matriz ComponentTransform no arquivo de resposta de saída. Considere encaminhar a saída para jq para extrair facilmente os bits relevantes usando o seguinte comando gcloud:

  gcloud dataflow jobs describe --full $JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage[] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Otimização de combinação

As operações de agregação são um conceito importante no processamento de dados em grande escala. A agregação reúne dados que são muito diferentes em termos de conceito, tornando-a extremamente útil para a correlação. O modelo de programação do Dataflow representa as operações de agregação como as transformações GroupByKey, CoGroupByKey e Combine.

As operações de agregação do Dataflow combinam dados de todo o conjunto de dados, inclusive os que podem ser distribuídos para vários workers. Durante essas operações, geralmente é mais eficiente combinar localmente o máximo possível de dados antes de combiná-los entre instâncias. Quando você aplica uma GroupByKey ou outra transformação de agregação, o serviço do Dataflow faz automaticamente a combinação parcial local antes da operação de agrupamento principal.

Na combinação parcial ou de vários níveis, o serviço Dataflow se orienta de acordo com o modo de execução do pipeline, ou seja, lote ou streaming. Para os dados limitados, o serviço favorece a eficiência e faz o máximo possível de combinações locais. Para dados ilimitados, o serviço favorece a latência mais baixa e não faz combinações parciais, já que isso eleva a latência.

Recurso de ajuste automático

O serviço Dataflow contém vários recursos de ajuste automático que podem otimizar de modo ainda mais dinâmico o job do Dataflow durante a execução. Esses recursos incluem o Escalonamento automático horizontal, o Escalonamento automático vertical e o Reequilíbrio dinâmico de trabalho.

Escalonamento automático horizontal

Com o escalonamento automático ativado, o serviço Cloud Dataflow seleciona automaticamente o número apropriado de instâncias de trabalho necessárias para executar o job. O serviço do Dataflow também realoca dinamicamente mais ou menos workers durante o tempo de execução de acordo com as características do job. Algumas partes do pipeline podem ser mais pesadas do que outras em termos computacionais, e o serviço do Dataflow pode ativar automaticamente mais workers durante essas fases do job e encerrá-los quando não forem mais necessários.

Java

O escalonamento automático horizontal é ativado por padrão nos jobs de streaming que usam o Streaming Engine e todos os jobs em lote. Para desativar o escalonamento automático, especifique a sinalização --autoscalingAlgorithm=NONE quando executar o pipeline. Nesse caso, observe que o serviço do Dataflow define o número de workers com base na opção --numWorkers, que é 3, por padrão.

Com o escalonamento automático ativado, o serviço do Dataflow não permite que o usuário tenha controle do número exato de instâncias de worker alocadas para o job. Também é possível limitar o número de workers. Para isso, especifique a opção --maxNumWorkers ao executar o pipeline.

Para jobs em lote, a sinalização --maxNumWorkers é opcional. O padrão é 1000. Para jobs de streaming, usando o Streaming Engine, a sinalização --maxNumWorkers é opcional. O padrão é 100. Para jobs de streaming que não usam o Streaming Engine, a sinalização --maxNumWorkers é obrigatória.

Python

O escalonamento automático é ativado por padrão em todos os jobs em lote do Dataflow criados com a versão 0.5.1 ou superior do SDK do Apache Beam para Python. Para desativar o escalonamento automático, especifique explicitamente a sinalização --autoscaling_algorithm=NONE quando executar o pipeline. Nesse caso, observe que o serviço do Dataflow define o número de workers com base na opção --num_workers, que é 3, por padrão.

Com o escalonamento automático horizontal ativado, o serviço Dataflow não permite controlar o número exato de instâncias de worker alocadas para o job. Também é possível limitar o número de workers. Para isso, especifique a opção --max_num_workers ao executar o pipeline.

Para jobs em lote, a sinalização --max_num_workers é opcional. O padrão é 1000. Para jobs de streaming, usando o Streaming Engine, a sinalização --max_num_workers é opcional. O padrão é 100. Para jobs de streaming que não usam o Streaming Engine, a sinalização --max_num_workers é obrigatória.

O Dataflow é escalonável com base no paralelismo de um pipeline. O paralelismo de um pipeline é uma estimativa do número de linhas de execução necessárias para processar os dados com mais eficiência em um determinado momento.

Escalonamento automático em lote

Para os pipelines em lote, o Dataflow escolhe automaticamente o número de workers com base na quantidade total estimada de trabalho em cada estágio do pipeline, que depende do tamanho da entrada e da capacidade atual. O Dataflow reavalia a quantidade de trabalho de acordo com o progresso da execução a cada 30 segundos, além de aumentar ou diminuir dinamicamente o número de workers à medida que a quantidade total de trabalho estimada aumenta ou diminui.

Se alguma das seguintes condições ocorrer, para salvar recursos inativos, o Dataflow manterá ou diminuirá o número de workers:

  • O uso médio de CPU do worker é menor que 5%.
  • O paralelismo é limitado devido a trabalhos não paralelos, como dados não divisíveis, como arquivos compactados ou dados processados por módulos de E/S que não são divididos.
  • O número de paralelismo é fixo, como gravação em arquivos atuais em um destino do Cloud Storage.
  • Se seu pipeline usa uma fonte de dados personalizada que você implementou, há alguns métodos que podem ser implementados para fornecer mais informações ao algoritmo de escalonamento automático do serviço Cloud Dataflow e, possivelmente, melhorar o desempenho:

    Java

    • Na subclasse BoundedSource, implemente o método getEstimatedSizeBytes. O serviço do Dataflow usa getEstimatedSizeBytes ao calcular o número inicial de workers que serão usados no pipeline.
    • Na subclasse BoundedReader, implemente o método getFractionConsumed. O serviço do Dataflow usa getFractionConsumed para acompanhar o progresso de leitura e convergir para o número correto de workers que serão usados durante uma leitura.

    Python

    • Na subclasse BoundedSource, implemente o método estimate_size. O serviço do Dataflow usa estimate_size ao calcular o número inicial de workers que serão usados no pipeline.
    • Na subclasse RangeTracker, implemente o método fraction_consumed. O serviço do Dataflow usa fraction_consumed para acompanhar o progresso de leitura e convergir para o número correto de workers que serão usados durante uma leitura.

    Escalonamento automático de streaming

    Com o escalonamento automático de streaming, o serviço do Dataflow altera de maneira adaptável o número de workers usados para executar o pipeline de streaming em resposta às mudanças na utilização de recursos e de carga. O escalonamento automático de streaming é oferecido gratuitamente e projetado para reduzir os custos dos recursos usados na execução de pipelines de streaming.

    O escalonamento automático de streaming determina quando escalonar, monitorando o tempo estimado do backlog. O tempo estimado do backlog é calculado com base na capacidade e nos bytes do backlog que ainda serão processados a partir da origem de entrada. Um pipeline é considerado "com backlog" quando o tempo estimado do backlog permanece acima de 15 segundos.

  • Escalonamento vertical: se um pipeline de streaming permanecer em backlog com os workers utilizando, em média, mais de 20% das CPUs por alguns minutos, o Dataflow será escalonado verticalmente. Os destinos do Dataflow limpam o backlog em aproximadamente 150 segundos após o escalonamento vertical, dada a capacidade atual por worker.
  • Redução de escalonamento vertical: se um backlog de pipeline de streaming for menor que 10 segundos e os workers estiverem utilizando, em média, menos de 75% das CPUs por um período de alguns minutos, o Dataflow reduzirá o escalonamento vertical. Após a redução, os workers utilizam, em média, 75% das CPUs. Em jobs de streaming que não usam o Streaming Engine, às vezes o uso da CPU não consegue chegar a 75%. Isso ocorre devido à distribuição do disco, em que cada worker precisa ter o mesmo número de discos permanentes. Assim, o uso da CPU é menor. Por exemplo, um job definido para usar no máximo 100 workers (com um disco por worker) pode ser reduzido para 50 workers (com dois discos por worker). Nesse job, não é possível alcançar 75% de uso da CPU porque a redução vertical seguinte de 100 workers é 50 workers, um número menor que os 75 workers necessários. Assim, o Dataflow não reduz esse job verticalmente, o que resulta em um uso da CPU inferior a 75%.
  • Sem escalonamento: se não houver backlog, mas o uso da CPU for 75% ou superior, o pipeline não reduz o escalonamento vertical. Se houver backlog, mas o uso da CPU for inferior a 20%, o pipeline não escalonará verticalmente.
  • Escalonamento preditivo: o mecanismo de streaming também usa uma técnica de escalonamento automático preditivo com base no temporizador de tempo. No modelo do Dataflow, os dados ilimitados em um pipeline de streaming são divididos em janelas agrupadas por carimbos de data/hora. No final de uma janela, os timers são acionados para cada chave processada nessa janela. O acionamento de um timer indica que a janela expirou para uma determinada chave. O mecanismo de streaming pode medir o backlog do timer, o que significa que ele pode prever quantos timers serão disparados no final de uma janela. Usar o backlog do timer como um sinal permite que o Dataflow "veja o futuro" estimando a quantidade de processamento que precisará acontecer quando futuros timers forem disparados. Com base na estimativa de carga futura, o Dataflow é escalonado automaticamente para atender à demanda esperada.
  • Quando o escalonamento automático não é usado, é possível escolher um número fixo de workers especificando numWorkers ou num_workers para executar o pipeline. Como a carga de trabalho de entrada varia com o passar do tempo, esse número pode ficar muito alto ou muito baixo. O provisionamento de workers em excesso resulta em custo adicional desnecessário, e o contrário resulta em latência mais alta para os dados processados. Com a ativação do escalonamento automático, os recursos são usados somente quando necessário.

    O objetivo do escalonamento automático de pipelines de streaming é minimizar o backlog e maximizar a capacidade e utilização de workers, além de reagir rapidamente aos picos de carga. Com o escalonamento automático ativado, você não precisa escolher entre provisionamento para carga de pico e atualização de resultados. Os workers são adicionados conforme a utilização de CPU e o backlog aumentam e são removidos quando essas métricas caem. Dessa forma, você paga apenas pelo que for necessário e o job é processado com a máxima eficiência possível.

    Java

    Fontes ilimitadas personalizadas

    Se o pipeline usa uma fonte personalizada ilimitada, essa fonte precisa informar o serviço do Dataflow sobre o backlog. O backlog é uma estimativa da entrada em bytes que ainda não foi processada pela fonte. Para informar o serviço sobre o backlog, implemente um dos métodos a seguir na classe UnboundedReader.

    • getSplitBacklogBytes(): backlog para a divisão atual da fonte. O serviço agrega o backlog em todas as divisões.
    • getTotalBacklogBytes(): backlog global em todas as divisões. Em alguns casos, o backlog não está disponível para cada divisão e só pode ser calculado em todas elas. Somente a primeira divisão (com ID "0") precisa fornecer o backlog total.
    O repositório do Apache Beam contém vários exemplos (em inglês) de fontes personalizadas que implementam a classe UnboundedReader.
    Ativar o escalonamento automático de streaming

    Para jobs de streaming que usam o Streaming Engine, o escalonamento automático é ativado por padrão.

    Para ativar o escalonamento automático de jobs que não usam o Streaming Engine, defina os seguintes parâmetros de execução ao iniciar o pipeline:

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

    Para jobs de streaming que não usam o Streaming Engine, o número mínimo de workers é 1/15 do valor --maxNumWorkers, arredondado.

    Os pipelines de streaming são implantados com um pool fixo de discos permanentes em mesmo número de --maxNumWorkers. Leve isso em consideração quando especificar --maxNumWorkers e verifique se esse valor é um número suficiente de discos para o pipeline.

    Uso e preços

    O cálculo da utilização do Compute Engine é baseado no número médio de workers e da utilização de discos permanentes, no valor exato de --maxNumWorkers. Esses discos são redistribuídos de modo que cada worker tenha um número igual de discos anexados.

    No exemplo acima, em que --maxNumWorkers=15, seria cobrado o preço de 1 a 15 instâncias do Compute Engine e de exatamente 15 discos permanentes.

    Python

    Ativar o escalonamento automático de streaming

    Para ativar o escalonamento automático, defina os seguintes parâmetros de execução quando iniciar o pipeline:

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

    Para jobs de streaming que não usam o Streaming Engine, o número mínimo de workers é 1/15 do valor --maxNumWorkers, arredondado.

    Os pipelines de streaming são implantados com um pool fixo de discos permanentes em mesmo número de --maxNumWorkers. Leve isso em consideração quando especificar --maxNumWorkers e verifique se esse valor é um número suficiente de discos para o pipeline.

    Uso e preços

    O cálculo da utilização do Compute Engine é baseado no número médio de workers e da utilização de discos permanentes, no valor exato de --max_num_workers. Esses discos são redistribuídos de modo que cada worker tenha um número igual de discos anexados.

    No exemplo acima, em que --max_num_workers=15, seria cobrado o preço de 1 a 15 instâncias do Compute Engine e de exatamente 15 discos permanentes.

    Como escalonar manualmente um pipeline de streaming

    Até o escalonamento automático ficar disponível no modo de streaming, escalone manualmente o número de trabalhadores que executam o pipeline de streaming usando o recurso Atualizar do Dataflow.

    Java

    Para escalonar o pipeline de streaming durante a execução, defina os parâmetros de execução a seguir ao iniciar o pipeline:

    • Defina --maxNumWorkers como o número máximo de workers que você quer disponibilizar para o pipeline.
    • Defina --numWorkers como o número inicial de workers que você quer que o pipeline use quando começar a ser executado.

    Atualize o pipeline quando ele já estiver em execução e especifique um novo número de workers usando o parâmetro --numWorkers. É necessário que o novo valor de --numWorkers esteja entre N e --maxNumWorkers, em que N é igual a --maxNumWorkers/15.

    A atualização substitui o job em execução por um novo, usando o novo número de workers, mas preservando todas as informações de estado associadas ao job anterior.

    Python

    Para escalonar o pipeline de streaming durante a execução, defina os parâmetros de execução a seguir ao iniciar o pipeline:

    • Defina --max_num_workers como o número máximo de workers que você quer disponibilizar para o pipeline.
    • Defina --num_workers como o número inicial de workers que você quer que o pipeline use quando começar a ser executado.

    Atualize o pipeline quando ele já estiver em execução e especifique um novo número de workers usando o parâmetro --num_workers. É necessário que o novo valor de --num_workers esteja entre N e --max_num_workers, em que N é igual a --max_num_workers/15.

    A atualização substitui o job em execução por um novo, usando o novo número de workers, mas preservando todas as informações de estado associadas ao job anterior.

    Escalonamento automático vertical para pipelines de streaming em Python

    O escalonamento automático vertical é um recurso que permite que o Dataflow Prime aumente ou diminua dinamicamente a memória disponível para os workers de acordo com os requisitos do job. O recurso foi projetado para tornar os jobs resilientes a erros de falta de memória e para maximizar a eficiência do pipeline. O Dataflow Prime monitora o pipeline, detecta situações em que os workers faltam ou excedem a memória disponível e os substitui por novos workers por mais ou menos memória.

    O Escalonamento automático vertical é ativado quando você ativa o Dataflow Prime.

    Limitações de visualização

    • Somente jobs de streaming em Python podem ser escalonados verticalmente.
    • Somente a memória das VMs dos workers pode ser escalonada verticalmente.
    • O aumento da memória tem um limite superior de 8 GiB para cada vCPU. Ao usar GPUs, o limite máximo de aumento de memória é de 6,5 GiB para cada vCPU.
    • O redução da memória tem um limite inferior de 3 GiB para cada vCPU.

    Como monitorar o escalonamento automático vertical

    As operações de escalonamento automático vertical são publicadas nos registros de job e worker. Para visualizar esses registros, consulte Como usar a interface de monitoramento do Dataflow.

    Efeito do escalonamento automático horizontal

    "In"Dataflow prime, o escalonamento automático vertical funciona em conjuntoEscalonamento automático horizontal de dados. Essa combinação permite que o Dataflow Prime aumente ou diminua o número de workers de acordo com as necessidades do seu pipeline e maximize a utilização da capacidade de computação.

    Por padrão, o escalonamento automático vertical (que ajusta a memória da VM) ocorre com uma frequência menor que o escalonamento automático horizontal (que ajusta o número de VMs). O escalonamento automático horizontal é desativado durante e até 10 minutos após uma atualização ser acionada pelo escalonamento automático vertical. Se houver um backlog significativo de dados de entrada após essa marca de 10 minutos, o escalonamento automático horizontal provavelmente ocorrerá para limpar esse backlog. Para saber mais sobre o escalonamento automático horizontal para pipelines de streaming, consulte Escalonamento automático de streaming.

    Solução de problemas

    Nesta seção, fornecemos instruções para solucionar problemas comuns relacionados ao escalonamento automático vertical.

    O escalonamento automático vertical não parece funcionar. O que preciso verificar?
    • Verifique se é um job de streaming do Python. O escalonamento automático vertical não está disponível para outros tipos de job.
    • Verifique se você ativou a API Cloud Scaling para seu projeto do Google Cloud.
    • Verifique se o job está executando o Dataflow Prime. Para mais informações, consulte Como ativar o Dataflow Prime.
    O job observa um backlog alto e uma marca d'água alta. O que preciso verificar?

    Se a remodelação vertical dos workers demorar mais de alguns minutos, o job poderá exibir um backlog alto dos dados de entrada e uma marca d'água alta. Para resolver esse problema, recomendamos que você use contêineres personalizados, já que eles podem melhorar a latência que pode surgir ao remodelar os workers. Se esse problema persistir após o uso de contêineres personalizados, entre em contato com o Suporte ao cliente.

    Reequilíbrio de trabalho dinâmico

    Com o recurso Reequilíbrio dinâmico de trabalho do serviço Dataflow, o serviço reparticiona dinamicamente o trabalho baseado nas condições do tempo de execução. Essas condições podem incluir:

    • Desequilíbrio nas atribuições de trabalho
    • Workers levam mais tempo do que o estimado para finalizar
    • os trabalhadores finalizam antes do tempo estimado

    O serviço Dataflow detecta automaticamente essas condições e reatribui o trabalho para trabalhadores subutilizados ou não utilizados a fim de diminuir o tempo de processamento geral do job.

    Limitações

    O reequilíbrio dinâmico de trabalho acontece apenas quando o serviço do Dataflow está processando dados de entrada em paralelo: ao ler dados de uma fonte de entrada externa, trabalhar com uma PCollection intermediária materializada ou trabalhar com o resultado de uma operação de agregação, como GroupByKey. Se ocorrer a fusão de um grande número de etapas do job, haverá menos PCollections no job e o reequilíbrio dinâmico de trabalho será limitado ao número de elementos na PCollection materializada da fonte. Para garantir que o reequilíbrio dinâmico de trabalho seja aplicado a uma determinada PCollection no pipeline, há algumas maneiras diferentes de evitar a fusão para garantir o carregamento em paralelo dinâmico.

    O reequilíbrio dinâmico de trabalho não pode atuar em dados mais refinados do que um registro único. Se os dados contêm registros individuais que causam grandes atrasos no tempo de processamento, podem ainda atrasar o job, já que o Dataflow não pode subdividir e redistribuir um registro individual "ativo" para vários workers.

    Java

    Se você definiu um número fixo de fragmentos para a saída final do pipeline (por exemplo, gravação de dados usando TextIO.Write.withNumShards), o carregamento em paralelo será limitado com base no número de fragmentos escolhido.

    Python

    Se você definiu um número fixo de fragmentos para a saída final do pipeline (por exemplo, efetuando a gravação de dados usando beam.io.WriteToText(..., num_shards=...)), o carregamento em paralelo será limitado pelo Dataflow com base no número de fragmentos escolhidos.

    A limitação de fragmentos fixos pode ser considerada temporária e estar sujeita a alterações nas futuras versões do serviço Dataflow.

    Trabalhar com origens de dados personalizadas

    Java

    Se o pipeline usar uma fonte de dados personalizada fornecida por você, implemente o método splitAtFraction para que ela funcione com o recurso Reequilíbrio dinâmico de trabalho.

    Python

    Se o pipeline usar uma fonte de dados personalizada fornecida por você, seu RangeTracker precisa implementar try_claim, try_split, position_at_fraction e fraction_consumed para que ela funcione com o recurso Reequilíbrio dinâmico de trabalho.

    Consulte Informações de referência da API sobre RangeTracker para saber mais.

    Gerenciamento e uso de recursos

    O serviço do Dataflow gerencia totalmente os recursos do Google Cloud por job. Isso inclui ativar e encerrar instâncias do Compute Engine, às vezes chamadas deworkers ou VMs, e acessar buckets no Cloud Storage do seu projeto para E/S e preparo de arquivos temporários. No entanto, se o pipeline interagir com as tecnologias de armazenamento de dados do Google Cloud, como o BigQuery e o Pub/Sub, será necessário gerenciar os recursos e a cota desses serviços.

    O Dataflow usa um local fornecido pelo usuário no Cloud Storage especificamente para o preparo de arquivos. Esse local está sob seu controle. É preciso garantir que o tempo de vida do local dure enquanto os jobs fazem leitura nele. O mesmo local de preparação pode ser reutilizado em execuções de vários jobs, pois o armazenamento em cache integrado do SDK pode acelerar o tempo de início dos jobs.

    Vagas

    É possível executar até 25 jobs simultâneos do Dataflow por projeto do Google Cloud. No entanto, esse limite pode ser aumentado entrando em contato com o suporte do Google Cloud. Para mais informações, consulte Cotas.

    No momento, o serviço do Dataflow está limitado a processar solicitações de jobs JSON com 20 MB ou menos. O tamanho da solicitação de job está expressamente vinculado à representação JSON do pipeline. Um pipeline maior aceita solicitações maiores.

    Para estimar o tamanho da solicitação JSON do pipeline, execute-o com a seguinte opção:

    Java

    --dataflowJobFile=< path to output file >
    

    Python

    --dataflow_job_file=< path to output file >
    

    Esse comando grava uma representação JSON do job em um arquivo. O tamanho do arquivo serializado é uma boa estimativa do tamanho da solicitação. O tamanho real é ligeiramente maior em função de algumas informações adicionais incluídas na solicitação.

    Veja mais informações na página de solução de problemas "413 Entidade de solicitação muito grande" / "O tamanho da representação JSON serializada do pipeline excede o limite permitido".

    Além disso, o tamanho do gráfico do job precisa ter menos de 10 MB. Para mais informações, consulte a página da solução do problema "O gráfico do job é muito grande. Tente novamente com um gráfico menor ou divida o job em dois ou mais jobs menores.".

    Workers

    No momento, o serviço do Dataflow comporta, no máximo, 1.000 instâncias do Compute Engine por job. Para jobs em lote, o tipo de máquina padrão é n1-standard-1. Para jobs de streaming, o tipo de máquina padrão para jobs de Streaming Engine-enabled é n1-standard-2 e o tipo de máquina padrão para jobs non-Streaming Engine é n1-standard-4. Portanto, quando você usa os tipos de máquina padrão, o serviço Dataflow aloca até 4.000 núcleos por job. Se você precisar de mais núcleos para o job, poderá selecionar um tipo de máquina maior.

    Você pode usar qualquer uma das famílias de tipos de máquina do Compute Engine disponíveis, bem como tipos de máquinas personalizadas. Para melhores resultados, use tipos de máquina n1. Os tipos de máquina com núcleo compartilhado, como workers da série f1 e g1, não recebem suporte de acordo com o Contrato de nível de serviço do Dataflow.

    O Dataflow cobra pelo número de vCPUs e GB de memória nos workers. O faturamento não depende da família de tipos de máquinas. Para especificar um tipo de máquina para o pipeline, defina o parâmetro de execução apropriado no momento da criação do pipeline.

    Java

    Para alterar o tipo de máquina, defina a opção --workerMachineType.

    Python

    Para alterar o tipo de máquina, defina a opção --worker_machine_type.

    Cota de recursos

    O serviço do Dataflow faz verificações para garantir que o projeto do Google Cloud tenha a cota de recursos do Compute Engine necessária para executar o job, tanto para iniciar quanto para escalonar o job para o número máximo de instâncias de worker. O job não será iniciado se não houver cota de recursos suficiente.

    O recurso de Escalonamento automático é limitado pela cota disponível do Compute Engine do projeto. Se o job tiver cota suficiente quando for iniciado e um outro job usar o restante da cota disponível do projeto, o primeiro job será executado, mas não será totalmente escalonado.

    No entanto, o serviço Dataflow não gerencia aumentos de cota dos jobs que excedem as cotas de recursos do projeto. Todas as solicitações necessárias para a cota de recursos adicionais ficam sob a responsabilidade do usuário. Para realizar essa tarefa, é possível usar o Console do Google Cloud.

    Endereços IP

    Por padrão, o Dataflow atribui endereços IP públicos e privados às VMs de worker. Um endereço IP público atende a um dos critérios de acesso à Internet, mas um endereço IP público também é contabilizado na cota de endereços IP externos.

    Se as VMs de worker não precisarem de acesso à Internet pública, considere usar apenas endereços IP internos, que não são contabilizados na cota externa. Para mais informações sobre como configurar endereços IP, consulte os seguintes recursos:

    Recursos de disco permanente

    O serviço do Dataflow está limitado atualmente a 15 discos permanentes por instância de worker na execução de um job de streaming. Cada disco permanente é local a uma máquina virtual individual do Compute Engine. O job não pode ter mais workers do que discos permanentes. A proporção de 1:1 entre workers e discos é a cota mínima de recursos.

    Os jobs que usam o Streaming Engine usam discos de inicialização de 30 GB. Os jobs que usam o Dataflow Shuffle usam discos de inicialização de 25 GB. Para jobs que não usam essas ofertas, o tamanho padrão de cada disco permanente é 250 GB no modo de lote e 400 GB no modo de streaming.

    Locais

    Por padrão, o serviço do Dataflow implanta recursos do Compute Engine na zona us-central1-f da região us-central1. Para modificar essa configuração, é necessário especificar o parâmetro --region. Se você precisar usar recursos em uma zona específica, utilize o parâmetro --zone ao criar o pipeline. No entanto, recomendamos especificar apenas a região e não definir a zona. Assim, o serviço do Dataflow seleciona automaticamente a melhor zona dentro da região, com base na capacidade disponível no momento da solicitação de criação do job. Para mais informações, consulte a documentação sobre endpoints regionais.

    Streaming Engine

    Atualmente, o sistema de execução de pipelines do Dataflow executa as etapas do pipeline de streaming inteiramente em máquinas virtuais de worker e consome CPU, memória e armazenamento de disco permanente do worker. O Streaming Engine do Dataflow move a execução do pipeline das VMs de worker para o back-end do serviço do Dataflow.

    Benefícios do Streaming Engine

    O modelo do Streaming Engine tem os seguintes benefícios:

    • Redução nos recursos consumidos de CPU, memória e armazenamento de disco permanente nas VMs de worker. O Streaming Engine funciona melhor com tipos de máquina de worker menores (n1-standard-2, em vez de n1-standard-4) e não requer discos permanentes a mais além de um disco pequeno para inicialização do worker, o que reduz o consumo de recursos e cotas.
    • Escalonamento automático com mais resposta a variações no volume de dados recebidos. O Streaming Engine oferece uma transição de escalonamento mais suave e granular dos workers.
    • Melhor suporte, já que você não precisa reimplantar seus pipelines para aplicar atualizações de serviço.

    A maior parte da redução nos recursos do worker é resultado da transferência do trabalho para o serviço do Dataflow. Por esse motivo, usar o Streaming Engine gera uma cobrança. No entanto, espera-se que o faturamento total dos pipelines do Dataflow que usam o Streaming Engine seja quase igual ao daqueles que não usam essa opção.

    Como usar o Streaming Engine

    Esse recurso está disponível em todas as regiões em que o Dataflow é compatível. Para ver os territórios disponíveis, consulte Locais do Dataflow.

    Java

    Para usar o Streaming Engine em pipelines de streaming, especifique o seguinte parâmetro:

    • --enableStreamingEngine, se você estiver usando a versão 2.11.0 ou superior do SDK do Apache Beam para Java.
    • --experiments=enable_streaming_engine, se você estiver usando a versão 2.10.0 do SDK do Apache Beam para Java.

    Se você usar o Dataflow Streaming Engine no pipeline, não especifique o parâmetro --zone. Em vez disso, especifique o parâmetro --region e defina o valor como uma das regiões em que o Streaming Engine está disponível no momento. O Dataflow selecionará automaticamente a zona na região que você especificou. O Dataflow apresentará um erro se você especificar o parâmetro --zone e defini-lo como uma zona fora das regiões disponíveis.

    O Streaming Engine funciona melhor com tipos de máquinas de worker menores. Por isso, recomendamos que você defina --workerMachineType=n1-standard-2. Também é possível definir --diskSizeGb=30 porque o Streaming Engine precisa de espaço apenas para a imagem de inicialização do worker e os registros locais. Esses valores são padrão.

    Python

    O Streaming Engine é ativado por padrão para novos pipelines de streaming do Dataflow quando as seguintes condições são atendidas:

    Se quiser desativar o Streaming Engine no pipeline de streaming do Python, especifique o parâmetro a seguir:

    --experiments=disable_streaming_engine

    Se você usar o Python 2, ainda precisará ativar o Streaming Engine especificando o seguinte parâmetro:

    --enable_streaming_engine

    Se você usar o Dataflow Streaming Engine no seu pipeline, não especifique o parâmetro --zone. Em vez disso, especifique o parâmetro --region e defina o valor como uma das regiões em que o Streaming Engine está disponível no momento. O Dataflow selecionará automaticamente a zona na região que você especificou. O Dataflow apresentará um erro se você especificar o parâmetro --zone e defini-lo como uma zona fora das regiões disponíveis.

    O Streaming Engine funciona melhor com tipos de máquinas de worker menores. Por isso, recomendamos que você defina --machine_type=n1-standard-2. Também é possível definir --disk_size_gb=30 porque o Streaming Engine precisa de espaço apenas para a imagem de inicialização do worker e os registros locais. Esses valores são padrão.

    Dataflow Shuffle

    O Dataflow Shuffle é a operação básica por trás das transformações do Dataflow, como GroupByKey, CoGroupByKey e Combine. A operação do Dataflow Shuffle particiona e agrupa os dados por chave de maneira escalonável, eficiente e tolerante a falhas. Atualmente, o Dataflow usa uma implementação de embaralhamento que é executada totalmente em máquinas virtuais de worker e consome CPU, memória e armazenamento em disco permanente do worker. O recurso Dataflow Shuffle baseado em serviços, disponível somente para pipelines em lote, move a operação de embaralhamento das VMs de worker para o back-end do serviço do Dataflow.

    Os jobs em lote usam o Dataflow Shuffle por padrão.

    Benefícios do Dataflow Shuffle

    O Dataflow Shuffle baseado em serviços oferece os seguintes benefícios:

    • Tempo de execução mais rápido de pipelines em lote para a maioria dos tipos de job de pipeline.
    • Redução nos recursos consumidos de CPU, memória e armazenamento de disco permanente nas VMs de worker.
    • Melhor escalonamento automático horizontal, porque as VMs não têm mais dados embaralhados e, portanto, podem ser reduzidas mais cedo.
    • Melhor tolerância a falhas. Uma VM não íntegra que detiver dados do Dataflow Shuffle não causará falha em todo o job, como aconteceria se o recurso não estivesse sendo usado.

    A maior parte da redução nos recursos do worker é motivada pela transferência do trabalho de embaralhamento para o serviço do Dataflow. Por esse motivo, há uma cobrança associada ao uso do Dataflow Shuffle. No entanto, espera-se que o faturamento total dos pipelines que usam a implementação do Dataflow com base em serviço seja menor ou igual ao daqueles que não usam essa opção.

    Para a maioria dos tipos de job de pipeline, espera-se que o Dataflow Shuffle seja executado mais rapidamente do que a implementação de embaralhamento em execução nas VMs de worker. No entanto, os tempos de execução podem variar. Se você estiver executando um pipeline com prazos importantes, recomendamos alocar tempo de espera suficiente antes do prazo final. Além disso, considere solicitar uma cota maior para o Shuffle.

    Exceções para usar o Dataflow Shuffle

    Os jobs em lote usam o Dataflow Shuffle por padrão. O tamanho do disco de inicialização de cada job em lote é reduzido para 25 GB em vez dos 250 GB padrão. Para alguns jobs em lote, talvez seja necessário desativar o Dataflow Shuffle ou modificar o tamanho do disco. Considere o seguinte:

    • Uma VM de worker usa parte dos 25 GB de espaço em disco para o sistema operacional, binários, registros e contêineres. Quando você usa o Dataflow Shuffle, os jobs que usam uma quantidade significativa de disco e excedem a capacidade restante podem falhar.
    • Jobs que usam muita E/S de disco podem ser lentos devido ao desempenho do disco pequeno. Veja mais informações sobre as diferenças de desempenho entre os tamanhos de disco na página Desempenho de disco permanente do Compute Engine.

    Para especificar um tamanho de disco maior para um job do Dataflow Shuffle, use o parâmetro --disk_size_gb.

    Para desativar o Dataflow Shuffle, consulte a seção a seguir.

    Como usar o Dataflow Shuffle

    Esse recurso está disponível em todas as regiões em que o Dataflow é compatível. Para ver os territórios disponíveis, consulte Locais do Dataflow. Se você usar o Dataflow Shuffle, os workers precisarão ser implantados na mesma região do endpoint regional.

    Java

    Os jobs em lote usam o Dataflow Shuffle por padrão. Para desativar o uso do Dataflow Shuffle, especifique a seguinte opção de pipeline:
    --experiments=shuffle_mode=appliance.

    Se você usa o Dataflow Shuffle no pipeline, não especifique as opções zone. Em vez disso, especifique o region e defina o valor como uma das regiões em que o Shuffle está disponível. O Dataflow selecionará automaticamente a zona na região que você especificou. O Dataflow apresentará um erro se você especificar a opção de pipeline zone e defini-la como uma zona fora das regiões disponíveis. Se você definir uma combinação incompatível entre region e zone, o job não poderá usar o Dataflow Shuffle.

    Python

    Os jobs em lote usam o Dataflow Shuffle por padrão. Para desativar o uso do Dataflow Shuffle, especifique a seguinte opção de pipeline:
    --experiments=shuffle_mode=appliance.

    Se você usa o Dataflow Shuffle no pipeline, não especifique as opções zone. Em vez disso, especifique o region e defina o valor como uma das regiões em que o Shuffle está disponível. O Dataflow selecionará automaticamente a zona na região que você especificou. O Dataflow apresentará um erro se você especificar a opção de pipeline zone e defini-la como uma zona fora das regiões disponíveis. Se você definir uma combinação incompatível entre region e zone, o job não poderá usar o Dataflow Shuffle.

    Programação flexível de recursos do Dataflow

    O Dataflow FlexRS reduz os custos de processamento em lote usando técnicas avançadas de programação, o serviço do Dataflow Shuffle e uma combinação de instâncias de máquina virtual (VM) preemptiva e VMs comuns. Ao executar VMs preemptivas e VMs comuns em paralelo, o Dataflow melhora a experiência do usuário caso o Compute Engine interrompa as instâncias de VM preemptiva durante um evento do sistema. O FlexRS ajuda a garantir que o pipeline continue a progredir e que você não perca o trabalho anterior quando o Compute Engine força a interrupção das VMs preemptivas. Para mais informações sobre o FlexRS, consulte Como usar a programação flexível de recursos no Dataflow.

    Dataflow Runner v2

    O executor atual do Dataflow de produção utiliza workers específicos da linguagem ao executar pipelines do Apache Beam. Para melhorar a escalonabilidade, a generalidade, a extensibilidade e a eficiência, o executor do Dataflow está migrando para uma arquitetura mais baseada em serviços. Essas alterações incluem uma arquitetura de worker mais eficiente e portátil empacotada com o Shuffle Service e o Streaming Engine.

    O Dataflow Runner v2 é compatível com as seguintes configurações:

    Java

    O Dataflow Runner v2 requer o SDK do Apache Beam para Java na versão 2.30.0 ou posterior.

    Para ativar o Runner v2, execute o job com a seguinte sinalização: --experiments=use_runner_v2.

    Python

    O Dataflow Runner v2 requer a versão 2.21.0 ou posterior do SDK do Apache Beam para Python. Runner v2 é o padrão para pipelines em lote do Python (versão 2.21.0 ou posterior) que foram enviados a partir de 1º de fevereiro de 2021 e pipelines de streaming do Python (versão 2.21.0 ou posterior). Não é necessário fazer alterações no código do pipeline para aproveitar essa nova arquitetura.

    Em determinadas circunstâncias, seu pipeline pode não usar o Runner v2, embora ele seja executado em uma versão compatível do SDK. Alguns exemplos são jobs de modelo do Dataflow ou jobs em que a opção "Agrupar por chave" é usada como entrada secundária. Nesses casos, é possível executar o job com a sinalização --experiments=use_runner_v2.

    Benefícios do uso do Dataflow Runner v2

    Novos recursos estarão disponíveis somente no Dataflow Runner v2. Além disso, a eficiência aprimorada da arquitetura do Dataflow Runner v2 pode levar a melhorias de desempenho nos jobs do Dataflow.

    Ao usar o Dataflow Runner v2, você notará uma redução na sua fatura.

    O Dataflow Runner v2 também permite pré-criar o contêiner Python, o que pode melhorar os tempos de inicialização da VM e o desempenho do escalonamento automático. Para testar esse recurso experimental, ative a API Cloud Build no seu projeto e envie seu pipeline com o seguinte parâmetro:
    --prebuild_sdk_container_engine=cloud_build.

    Quando o job for concluído ou interrompido, remova a imagem pré-criada do Container Registry. O URL da imagem pode ser encontrado na IU de monitoramento do Dataflow em Opções de pipeline.

    O Dataflow Runner v2 suporta pipelines em vários idiomas, um recurso que permite que o pipeline do Apache Beam use transformações definidas em outros SDKs do Apache Beam. Atualmente, o Dataflow Runner v2 suporta o uso de transformações Java de um pipeline SDK do Python (Pré-lançamento).

    Como usar o Dataflow Runner v2

    O Dataflow Runner v2 está disponível em regiões com endpoints regionais do Dataflow. Enquanto isso, se você quiser testar o Dataflow Runner v2, use o seguinte parâmetro:
    --experiments=use_runner_v2

    Como depurar jobs do Dataflow Runner v2

    Para depurar jobs usando o Dataflow Runner v2, siga as etapas de depuração padrão. No entanto, esteja ciente do seguinte ao usar o Dataflow Runner v2:

    • Os jobs do Dataflow Runner v2 executam dois tipos de processos na VM do worker: o processo do SDK e o processo de proteção do executor. Dependendo do pipeline e do tipo de VM, pode haver um ou mais processos do SDK, mas há apenas um processo de proteção do executor por VM.
    • Os processos do SDK executam o código do usuário e outras funções específicas da linguagem, enquanto o processo de proteção do executor gerencia todo o restante.
    • O processo de aproveitar o executor aguarda que todos os processos do SDK se conectem a ele antes de começar a solicitar trabalho do Dataflow.
    • Os jobs atrasar se a VM do worker fizer o download e instalar dependências durante a inicialização do processo do SDK. Se houver problemas em um processo do SDK, como inicialização ou instalação de bibliotecas, o worker informará seu status como não íntegro. Se os tempos de inicialização aumentarem, ative a API Cloud Build no seu projeto e envie seu pipeline com o seguinte parâmetro:
      --prebuild_sdk_container_engine=cloud_build.
    • Os registros de VM de worker (disponíveis por meio do Logs Explorer ou da interface de monitoramento do Dataflow) incluem registros do processo de aproveitamento do executor, bem como registros dos processos do SDK.
    • Para diagnosticar problemas no seu código de usuário, examine os registros do worker dos processos do SDK. Se você encontrar erros nos registros da proteção do executor, entre em contato com o suporte para registrar um bug.

    VM protegida do Dataflow

    Instrua o serviço Dataflow a usar workers da VM protegida. Saiba mais sobre as capacidades de VM protegida na VM protegida.

    Java

    Para usar workers de VM protegida, especifique o seguinte parâmetro --dataflowServiceOptions=enable_secure_boot.

    Python

    Para usar workers de VM protegida, especifique o seguinte parâmetro --dataflow_service_options=enable_secure_boot.