Fazer upgrade de um pipeline de streaming

Nesta página, você encontra orientações e recomendações para fazer um upgrade dos pipelines de streaming. Por exemplo, talvez seja necessário fazer um upgrade para uma versão mais recente do SDK do Apache Beam ou atualizar o código do pipeline. Diferentes opções são fornecidas para atender a diferentes cenários.

Ao contrário dos pipelines em lote, que são interrompidos quando o job é concluído, pipelines de streaming costumam ser executados continuamente para fornecer um processamento ininterrupto. Portanto, ao fazer o upgrade de pipelines de streaming, você precisa considerar o seguinte:

  • Talvez seja necessário minimizar ou evitar a interrupção do pipeline. Em alguns casos, é possível tolerar uma interrupção temporária no processamento enquanto uma nova versão de um pipeline é implantada. Em outros casos, o aplicativo pode não tolerar qualquer interrupção.
  • Os processos de atualização de pipeline precisam lidar com as alterações de esquema de uma maneira que minimize a interrupção do processamento de mensagens e para outros sistemas anexados. Por exemplo, se o esquema de mensagens em um pipeline de processamento de eventos for alterado, as alterações de esquema também poderão ser necessárias em coletores de dados downstream.

É possível usar um dos métodos a seguir para atualizar pipelines de streaming, dependendo do pipeline e dos requisitos de atualização:

Para mais informações sobre problemas que podem ser encontrados durante uma atualização e como evitá-los, consulte Validar um job substituto e Verificação de compatibilidade do job.

Práticas recomendadas

  • Faça o upgrade da versão do SDK do Apache Beam separadamente de qualquer alteração no código do pipeline.
  • Teste o pipeline após cada alteração antes de fazer outras atualizações.
  • Faça o upgrade regularmente da versão do SDK do Apache Beam usado pelo pipeline.

Realizar atualizações em andamento

É possível atualizar alguns pipelines de streaming em andamento sem interromper o job. Esse cenário é chamado de atualização de job em andamento. As atualizações de jobs em andamento estão disponíveis apenas em circunstâncias limitadas:

  • O job deve usar o Streaming Engine.
  • O job precisa estar no estado de execução.
  • Você está alterando apenas o número de workers usados pelo job

Para mais informações, consulte Definir o intervalo de escalonamento automático na página "Escalonamento automático horizontal".

Para instruções sobre como executar uma atualização de job em andamento, consulte Atualizar um pipeline atual.

Iniciar um job de substituição

Se o job atualizado for compatível com o job atual, será possível atualizar o pipeline usando a opção update. Quando você substitui um job atual, um novo job executa o código atualizado do pipeline. O serviço do Dataflow retém o nome do job, mas executa o substituto com um código da tarefa atualizado. Esse processo pode causar inatividade enquanto o job atual é interrompido, a verificação de compatibilidade é executada e o novo job é iniciado. Para mais detalhes, consulte Os efeitos da substituição de um job.

O Dataflow executa uma verificação de compatibilidade para garantir que o código atualizado do pipeline possa ser implantado com segurança no pipeline em execução. Certas mudanças no código causam falhas na verificação de compatibilidade, como quando entradas secundárias são adicionadas ou removidas de uma etapa existente. Quando a verificação de compatibilidade falha, não é possível atualizar um job no local.

Para instruções explicando como iniciar um job substituto, consulte Iniciar um job substituto.

Se a atualização do pipeline for incompatível com o job atual, interrompa e substitua o pipeline. Se o pipeline não puder tolerar a inatividade, execute pipelines paralelos.

Interromper e substituir pipelines

Se for possível interromper temporariamente o processamento, cancele ou drene o pipeline e substitua-o pelo pipeline atualizado. O cancelamento de um pipeline faz com que o Dataflow interrompa imediatamente o processamento e encerre os recursos o mais rápido possível, o que pode causar perda de dados em processamento, conhecidos como dados em trânsito. Para evitar a perda de dados, na maioria dos casos, a drenagem é preferível. Também é possível usar snapshots do Dataflow para salvar o estado de um pipeline de streaming, o que permite iniciar uma nova versão do job do Dataflow sem perder o estado. Para mais informações, acesse Usar snapshots do Dataflow.

A drenagem de um pipeline imediatamente fecha qualquer janela em processamento e aciona todos os gatilhos (links em inglês). Embora os dados em trânsito não sejam perdidos, a drenagem pode fazer com que as janelas tenham dados incompletos. Se isso acontecer, as janelas em processo emitem resultados parciais ou incompletos. Para mais informações, consulte Efeitos da drenagem de um job. Depois que o job atual for concluído, será possível iniciar um novo job de streaming que contenha o código de pipeline atualizado, permitindo a retomada do processamento.

Com esse método, há um período de inatividade entre o momento em que o job de streaming atual é interrompido e o momento em que o pipeline substituto está pronto para retomar o processamento de dados. No entanto, cancelar ou drenar um pipeline atual e iniciar um novo job com o pipeline atualizado é menos complicado do que executar pipelines paralelos.

Para instruções mais detalhadas, consulte Drenar um job do Dataflow. Depois de drenar o job atual, inicie um novo com o mesmo nome.

Reprocessamento de mensagens usando o Pub/Sub Snapshot e Seek

Em algumas situações, depois de substituir ou cancelar um pipeline drenado, talvez seja necessário reprocessar as mensagens do Pub/Sub entregues anteriormente. Por exemplo, talvez seja necessário usar a lógica de negócios atualizada para reprocessar dados. O Pub/Sub Seek é um recurso que permite reproduzir mensagens em um snapshot do Pub/Sub. Use o Pub/Sub Seek com o Dataflow para reprocessar mensagens a partir do momento em que o snapshot da assinatura é criado.

Durante o desenvolvimento e o teste, também é possível usar o Pub/Sub Seek para reproduzir as mensagens conhecidas repetidamente para verificar a saída do pipeline. Ao usar a busca do Pub/Sub Seek, não pesquise um snapshot de assinatura quando a assinatura estiver sendo consumida por um pipeline. Se você fizer isso, ele poderá invalidar a lógica de marca d'água do Dataflow e afetar exatamente o processamento de mensagens do Pub/Sub.

Veja a seguir um fluxo de trabalho recomendado da gcloud CLI para usar o Pub/Sub Seek com pipelines do Dataflow em uma janela do terminal:

  1. Para criar um snapshot da assinatura, use o comando gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Para drenar ou cancelar o pipeline, use o comando gcloud dataflow jobs drain ou gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    ou

    gcloud dataflow jobs cancel JOB_ID
    
  3. Para buscar o snapshot, use o comando gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Implante um novo pipeline que consuma a assinatura.

Executar pipelines paralelos

Se for necessário evitar a interrupção do pipeline de streaming durante uma atualização, execute pipelines paralelos. Crie um novo job de streaming que tenha o código de pipeline atualizado e execute o novo pipeline em paralelo com o atual.

Ao criar o novo pipeline, use a mesma estratégia de janelamento usada para o pipeline atual. Deixe o pipeline existente continuar em execução até que a marca-d'água exceda o carimbo de data/hora da janela completa mais antiga processada pelo pipeline atualizado. Em seguida, esvazie ou cancele o pipeline existente. O pipeline atualizado continua em execução no lugar e assume o processamento de maneira efetiva por conta própria.

O diagrama a seguir é uma ilustração desse processo:

O pipeline B se sobrepõe ao pipeline B por uma janela de cinco minutos.

No diagrama, o pipeline B é o job atualizado que assume o pipeline A. O valor t é o carimbo de data/hora da janela completa mais antiga processada pelo pipeline B. O valor w é a marca d'água do pipeline A. Para simplificar, uma marca d'água perfeita é presumida sem dados atrasados. O processamento e o tempo decorrido são representados no eixo horizontal. Os dois pipelines usam janelas fixas de cinco minutos. Os resultados são acionados depois que a marca d'água passa o fim de cada janela.

Como a saída simultânea ocorre durante o período em que os dois pipelines se sobrepõem, configure-os para gravar resultados em destinos diferentes. Os sistemas downstream podem usar uma abstração sobre os dois coletores de destino, como uma visualização do banco de dados, para consultar os resultados combinados. Esses sistemas também podem usar a abstração para eliminar a duplicação dos resultados do período sobreposto.

O exemplo a seguir descreve a abordagem de uso de um pipeline simples que lê dados de entrada do Pub/Sub, realiza um pouco do processamento e grava os resultados no BigQuery.

  1. No estado inicial, o pipeline de streaming atual (Pipeline A) está em execução e lendo mensagens de um tópico do Pub/Sub (Tópico) usando uma assinatura. (Assinatura A). Os resultados são gravados em uma tabela do BigQuery (tabela A). Os resultados são consumidos por uma visualização do BigQuery, que atua como uma fachada para mascarar alterações de tabela subjacentes. Esse processo é uma aplicação de um método de design chamado padrão de fachada. O diagrama a seguir mostra o estado inicial.

    Um pipeline com uma assinatura e gravação em uma única tabela do BigQuery.

  2. Você cria uma nova assinatura (assinatura B) para o pipeline atualizado. Implante o pipeline atualizado (pipeline B ), que lê o tópico do Pub/Sub (tópico) usando a assinatura B e grava em uma tabela separada do BigQuery (tabela B). Veja esse fluxo no diagrama a seguir.

    Dois pipelines, cada um com uma assinatura. Cada pipeline grava em uma tabela separada do BigQuery. Uma visualização de fachadas lê as duas tabelas.

    Neste ponto, o pipeline A e o pipeline B estão sendo executados em paralelo e gravando resultados em tabelas separadas. Registre o tempo t como o carimbo de data/hora da janela completa mais antiga processada pelo pipeline B.

  3. Quando a marca d'água do Pipeline A exceder o tempo t, drene o Pipeline A. Quando você drena o pipeline, todas as janelas abertas são fechadas e o processamento dos dados em trânsito é concluído. Se o pipeline contiver janelas e aquelas completas forem importantes (supondo que não haja dados atrasados), antes de drenar o pipeline A, deixe que os dois pipelines sejam executados até que você tenha janelas sobrepostas completas. Você interrompe o job de streaming do pipeline A depois que todos os dados em trânsito são processados e gravados na tabela A. O diagrama a seguir mostra esse estágio.

    O pipeline A é drenado e não lê mais a assinatura A, além de não enviar mais dados para a tabela A após a conclusão da drenagem. Todo o processamento é feito pelo segundo pipeline.

  4. Neste momento, apenas o pipeline B está em execução. É possível consultar em uma visualização do BigQuery (Visualização de Fachada), que atua como uma fachada para a tabela A e a tabela B. Para linhas com o mesmo carimbo de data/hora em ambas as tabelas, configure a visualização para retornar as linhas da Tabela B ou, se as linhas não existirem na Tabela B , volte para a Tabela A , O diagrama a seguir mostra a visualização (Visualização de Fachada)lendo na Tabela A e na Tabela B.

    O pipeline A desapareceu e apenas o pipeline B é executado.

    Nesse momento, você pode excluir a assinatura A.

Quando problemas são detectados com uma nova implantação de pipeline, ter pipelines paralelos pode simplificar a reversão. No exemplo anterior, convém manter o pipeline A em execução enquanto você monitora o pipeline B para a operação correta. Se ocorrer algum problema com o pipeline B, você pode reverter para o pipeline A.

Limitações

Essa abordagem tem as seguintes limitações:

  • A execução de dois pipelines na mesma entrada provavelmente vai gerar dados duplicados na saída. O sistema downstream precisa estar ciente dos dados duplicados e ser capaz de tolerá-los.
  • Ao ler de uma origem do Pub/Sub, o uso da mesma assinatura para vários pipelines não é recomendado e pode levar a problemas de correção. No entanto, em alguns casos de uso, como pipelines de extração, transformação e carregamento (ETL, na sigla em inglês), o uso da mesma assinatura em dois pipelines pode reduzir a duplicação. Os problemas com o escalonamento automático provavelmente ocorrem nesse cenário, mas podem ser atenuados usando o recurso de atualização de jobs em andamento. Para mais informações, consulte Ajustar o escalonamento automático para seus pipelines de streaming do Pub/Sub.
  • Ao ler de uma origem do Pub/Sub, o uso de uma segunda assinatura gera cópias, mas não causa problemas com precisão e escalonamento de dados de dados.

Gerenciar mutações de esquema

Os sistemas de gerenciamento de dados geralmente precisam acomodar mutações de esquema ao longo do tempo, às vezes devido a mudanças nos requisitos de negócios e outras vezes por motivos técnicos. A aplicação de atualizações de esquema normalmente requer planejamento e execução cuidadosos para evitar interrupções nos sistemas de informações comerciais.

Considere um pipeline simples que leia mensagens com payloads JSON de um tópico do Pub/Sub. O pipeline converte cada mensagem em uma instância TableRow e depois grava as linhas em uma tabela do BigQuery. O esquema da tabela de saída é semelhante às mensagens processadas pelo pipeline. No diagrama a seguir, o esquema é chamado de esquema A.

Pipeline que lê uma assinatura e grava em uma tabela de saída do BigQuery usando o esquema A.

Com o tempo, o esquema da mensagem pode passar por mudanças não triviais. Por exemplo, os campos são adicionados, removidos ou substituídos. O esquema A evolui para um novo esquema. Na discussão a seguir, o novo esquema é chamado de esquema B. Nesse caso, o pipeline A precisa ser atualizado, e o esquema da tabela de saída precisa ser compatível com o esquema B.

Para a tabela de saída, é possível executar algumas mutações de esquema sem o conteúdo central. Por exemplo, é possível adicionar novos campos ou relaxar modos de colunas, como alterar REQUIRED para NULLABLE, sem inatividade. Essas mutações geralmente não afetam as consultas atuais. No entanto, as mutações de esquema que modificam ou removem campos de esquema atuais interrompem as consultas ou resultam em outras interrupções. A abordagem a seguir acomoda alterações sem exigir tempo de inatividade.

Separe os dados gravados pelo pipeline em uma tabela principal e em uma ou mais tabelas de preparo. A tabela principal armazena dados históricos gravados pelo pipeline. As tabelas de preparo armazenam a última saída do pipeline. É possível definir uma visualização de fachada do BigQuery sobre as tabelas principais e de preparo, o que permite aos consumidores consultar dados históricos e atualizados.

No diagrama a seguir, o fluxo de pipeline anterior é revisado para incluir uma tabela de preparo (tabela de preparo A), uma tabela principal e uma visualização de fachada.

Pipeline que lê uma assinatura e grava em uma tabela de preparo do BigQuery. Uma segunda tabela (principal) tem saída de uma versão anterior do esquema. Uma visualização de fachada faz a leitura tanto da tabela de preparo quanto da tabela principal.

No fluxo revisado, o pipeline A processa mensagens que usam o esquema A e grava a saída na tabela de preparo A, que tem um esquema compatível. A tabela principal contém dados históricos gravados por versões anteriores do pipeline, além de resultados que são periodicamente mesclados na tabela de preparo. Os consumidores podem usar a visualização de fachada para consultar dados atualizados, incluindo dados históricos e em tempo real.

Quando o esquema da mensagem é alterado do esquema A para esquema B, é possível atualizar o código do pipeline para que seja compatível com mensagens que usam o esquema B. O pipeline atual precisa ser atualizado com a nova implementação. Ao executar pipelines paralelos, você garante que o processamento de dados de streaming continue sem interrupção. Encerrar e substituir pipelines resulta em uma interrupção no processamento, porque nenhum pipeline está em execução por um período.

O pipeline atualizado grava em uma tabela de preparo extra (tabela de preparo B) que usa o esquema B. É possível usar um fluxo de trabalho orquestrado para criar a nova tabela de preparo antes de atualizar o pipeline. Atualize a visualização de fachada para incluir os resultados da nova tabela de preparo, possivelmente usando uma etapa relacionada ao fluxo de trabalho.

No diagrama a seguir, mostramos o fluxo atualizado que mostra a tabela de preparo B com o esquema B e como a visualização de fachada foi atualizada para incluir conteúdo da tabela principal e das duas tabelas de preparo.

O pipeline agora usa o Esquema B e grava na Tabela de preparo B. Uma visualização de fachada lendo da tabela principal, da tabela de preparo A e da tabela de preparo B.

Como um processo separado da atualização do pipeline, é possível mesclar as tabelas de preparo na tabela principal, periodicamente ou conforme necessário. O diagrama a seguir mostra como a tabela de preparo A é mesclada na tabela principal.

A tabela de preparo A está mesclada com a tabela principal. A visualização de fachada lê da tabela de preparação B e da tabela principal.

A seguir