Interromper um pipeline do Dataflow em execução

Para interromper um job do Dataflow, use o console do Google Cloud, o Cloud Shell, um terminal local instalado com a Google Cloud CLI ou a API REST Dataflow.

É possível interromper um job do Dataflow de uma das três maneiras a seguir:

  • Cancelar um job. Este método serve para pipelines de streaming e em lote. O cancelamento de um job interrompe o processamento de dados, incluindo dados em buffer, pelo serviço do Dataflow. Para mais informações, consulte Cancelar um job.

  • Drenar um job. Esse método serve apenas para pipelines de streaming. A drenagem de um job permite que o serviço do Dataflow conclua o processamento dos dados em buffer, interrompendo simultaneamente a ingestão de novos dados. Para mais informações, consulte Drenar um job.

  • Forçar o cancelamento de um job. Este método serve para pipelines de streaming e em lote. Forçar o cancelamento de um job interrompe imediatamente o serviço do Dataflow de processar todos os dados, incluindo dados armazenados em buffer. Antes do cancelamento forçado, você deve tentar um cancelamento regular. O cancelamento força é destinado apenas a jobs que ficaram presos no processo normal de cancelamento. Confira mais informações em Forçar o cancelamento de um job.

Não é possível reiniciar um job cancelado. Se você não estiver usando modelos Flex, pode clonar o pipeline cancelado e iniciar um novo job com o pipeline clonado.

Antes de interromper um pipeline de streaming, crie um snapshot do job. Os snapshots do Dataflow salvam o estado de um pipeline de streaming, o que permite iniciar uma nova versão do job do Dataflow sem perder o estado. Saiba mais em Usar snapshots do Dataflow.

Se você tiver um pipeline complexo, crie um modelo e execute o job usando esse modelo.

Não é possível excluir jobs do Dataflow, mas é possível arquivá-los se estiverem concluídos. Todos os jobs concluídos são excluídos após um período de armazenamento de 30 dias.

Cancelar um job do Dataflow

Quando você cancela um job, o serviço do Dataflow o interrompe imediatamente.

As seguintes ações ocorrem ao cancelar um job:

  1. O serviço do Dataflow interrompe a ingestão e o processamento de dados.

  2. O serviço do Dataflow começa pela limpeza dos recursos do Google Cloud anexados ao job.

    Entre esses recursos, podem estar o desligamento de instâncias de worker do Compute Engine e o encerramento de conexões ativas com coletores ou fontes de E/S.

Informações importantes sobre o cancelamento de um job

  • O cancelamento de um job interrompe imediatamente o processamento do pipeline.

  • Ao cancelar um job, talvez você perca dados em trânsito. Dados em trânsito são os dados que já foram lidos, mas ainda estão sendo processados pelo pipeline.

  • Os dados gravados do pipeline em um coletor de saída antes de você cancelar o job ainda poderão ser acessados no coletor de saída.

  • Se a perda de dados não for um problema, cancelar o job garantirá que os recursos do Google Cloud associados a ele sejam encerrados o mais rápido possível.

Drenar um job do Dataflow

Quando você drena um job, o serviço do Dataflow conclui o job no estado atual. Para evitar a perda de dados ao desativar os pipelines de streaming, a melhor opção é drenar o job.

As seguintes ações ocorrem ao drenar um job:

  1. O job para de ingerir novos dados de fontes de entrada logo após receber a solicitação de drenagem (normalmente, em alguns minutos).

  2. O serviço do Dataflow preserva todos os recursos atuais, como instâncias de worker, para concluir o processamento e a gravação dos dados em buffer no pipeline.

  3. Quando todas as operações de processamento e gravação pendentes forem concluídas, o serviço do Dataflow encerrará os recursos do Google Cloud associados ao job.

Para drenar o job, o Dataflow interrompe a leitura de novas entradas, marca a origem com um carimbo de data/hora de evento no infinito e propaga os carimbos de data/hora de infinito pelo pipeline. Portanto, os pipelines no processo de drenagem podem ter uma marca d'água infinita.

Informações importantes sobre a drenagem de um job

  • Não é possível drenar um job de pipelines em lote.

  • O pipeline continua gerando custos de manutenção de todos os recursos associados do Google Cloud até que todos os processamentos e gravações sejam concluídos.

  • É possível atualizar um pipeline que está sendo drenado. Se o pipeline estiver travado, a atualização do pipeline com código que corrige o erro que está criando o problema permite uma drenagem bem-sucedida sem perda de dados.

  • É possível cancelar um job que está sendo drenado.

  • A drenagem de um job pode demorar bastante para ser concluída, por exemplo, quando o pipeline tem uma grande quantidade de dados em buffer.

  • Se o pipeline de streaming incluir um Splittable DoFn, será necessário truncar o resultado antes de executar a opção de drenagem. Saiba mais sobre como truncar Splittable DoFns na documentação do Apache Beam.

  • Em alguns casos, pode ser que o job do Dataflow não consiga concluir a operação de drenagem. Consulte os registros do job para encontrar a causa raiz e tomar as medidas adequadas.

Retenção de dados

  • O streaming do Dataflow é tolerante a workers que reiniciam e não falha em jobs de streaming quando ocorrem erros. Em vez disso, o serviço Dataflow é repetido até que você execute uma ação, como cancelar ou reiniciar o job. Quando você drena o job, o Dataflow continua a tentar novamente, o que pode levar a pipelines travados. Nessa situação, para ativar uma drenagem bem-sucedida sem perda de dados, atualize o pipeline com o código que corrige o erro que está criando o problema.

  • O Dataflow não reconhece mensagens até que elas sejam confirmadas de maneira durável pelo serviço do Dataflow. Por exemplo, com o Kafka, é possível ver esse processo como uma transferência segura de propriedade da mensagem do Kafka para o Dataflow, eliminando o risco de perda de dados.

Jobs travados

  • A drenagem não corrige pipelines travados. Se o movimento de dados estiver bloqueado, o pipeline permanecerá parado após o comando de drenagem. Para resolver um pipeline travado, use o comando update para atualizar o pipeline com código que resolva o erro que está criando o problema. Também é possível cancelar jobs travados, mas cancelar jobs pode resultar em perda de dados.

Temporizadores

  • Se o código do pipeline de streaming incluir um timer em loop, o job poderá ficar lento ou incapaz de drenar. Como a drenagem não termina até que todos os timers sejam concluídos, os pipelines com timers de loop infinito nunca terminam.

  • O Dataflow aguarda a conclusão de todos os timers de tempo de processamento em vez de acioná-los imediatamente, o que pode resultar em uma drenagem lenta.

Efeitos da drenagem de um job

Quando você drena um pipeline de streaming, o Dataflow fecha imediatamente qualquer janela em processamento e aciona todos os gatilhos.

O sistema não espera nenhuma janela pendente baseada em tempo terminar em uma operação de drenagem.

Por exemplo, ao drenar um job, se o pipeline estiver há apenas 10 minutos em uma janela de duas horas, o Dataflow não aguardará o tempo restante para a conclusão da janela. Ele fechará a janela imediatamente com resultados parciais. O Dataflow faz as janelas abertas serem fechadas avançando a marca-d'água dos dados para o infinito. Essa funcionalidade também é válida para fontes de dados personalizadas.

Ao drenar um pipeline que usa uma classe de fonte de dados personalizada, o Dataflow interrompe a emissão de solicitações de novos dados, avança a marca-d'água dos dados para o infinito e chama o método finalize() da sua fonte no último checkpoint.

A drenagem pode resultar em janelas parcialmente preenchidas. Nesse caso, se você reiniciar o pipeline drenado, a mesma janela poderá ser acionada uma segunda vez, o que pode causar problemas com seus dados. Por exemplo, no cenário a seguir, os arquivos podem ter nomes conflitantes, e os dados podem ser substituídos:

Se você drenar um pipeline com janelamento por hora às 12h34, a janela das 12h às 13h será fechada apenas com os dados disparados nos primeiros 34 minutos da janela. O pipeline não lê novos dados após as 12h34.

Se você reiniciar imediatamente o pipeline, a janela das 12h às 13h será acionada novamente, apenas com os dados lidos das 12h35 às 13h. Nenhuma cópia é enviada, mas se um nome de arquivo for repetido, os dados serão substituídos.

No console do Google Cloud, é possível ver os detalhes das transformações do pipeline. O diagrama a seguir mostra os efeitos de uma operação de drenagem em processo. Observe que a marca-d'água está avançada para o valor máximo.

Uma visualização em etapas de uma operação de drenagem.

Figura 1. Uma visualização em etapas de uma operação de drenagem.

Forçar o cancelamento de um job do Dataflow

Use o cancelamento forçado somente quando não for possível cancelar o job usando outros métodos. Forçar o cancelamento encerra o job sem limpar todos os recursos. Se você usar o cancelamento forçado repetidamente, os recursos vazados poderão se acumular e usarão sua cota.

Quando você força o cancelamento de um job, o serviço do Dataflow interrompe o job imediatamente, vazando todas as VMs criadas pelo job. O cancelamento normal precisa ser feito pelo menos 30 minutos antes do cancelamento forçado.

As seguintes ações ocorrem ao cancelar um job:

  • O serviço do Dataflow interrompe a ingestão e o processamento de dados.

Informações importantes sobre o cancelamento de um job

  • O cancelamento de um job interrompe imediatamente o processamento do pipeline.

  • O cancelamento força de um job é destinado apenas a jobs que ficaram presos no processo normal de cancelamento.

  • As instâncias de worker que o job do Dataflow criou não são liberadas necessariamente, o que pode resultar em vazamentos de instâncias de worker. As instâncias de worker vazadas não contribuem para os custos do job, mas podem usar sua cota. Após a conclusão do cancelamento do job, será possível excluir esses recursos.

    Nos jobs do Dataflow Prime, não é possível ver ou excluir VMs com vazamento. Na maioria dos casos, essas VMs não criam problemas. No entanto, se elas causarem problemas como o consumo da sua cota de VMs, entre em contato com o suporte.

Interromper um job do Dataflow

Antes de interromper um job, entenda os efeitos de cancelamento, drenagem ou força o cancelamento de um job.

Console

  1. Acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Clique no job que você quer interromper.

    Para interromper um job, o status dele precisa serem execução.

  3. Na página de detalhes do job, clique em Parar.

  4. Escolha uma destas opções:

    • Em um pipeline em lote, clique em Cancelar ou Forçar cancelamento.

    • Para um pipeline de streaming, clique em Cancelar, Drenar ou Forçar cancelamento.

  5. Confirme sua escolha clicando em Interromper job.

gcloud

Para drenar ou cancelar um job do Dataflow, use o comando gcloud dataflow jobs no Cloud Shell ou em um terminal local instalado com a CLI gcloud.

  1. Faça login no shell.

  2. Liste os IDs dos jobs do Dataflow em execução e anote o ID do job que você quer interromper:

    gcloud dataflow jobs list
    

    Se a sinalização --region não estiver definida, serão exibidos os jobs do Dataflow de todas as regiões disponíveis.

  3. Siga uma das seguintes ações:

    • Para drenar um job de streaming:

      gcloud dataflow jobs drain JOB_ID
      

      Substitua JOB_ID pelo ID do job copiado anteriormente.

    • Para cancelar um job em lote ou de streaming:

      gcloud dataflow jobs cancel JOB_ID
      

      Substitua JOB_ID pelo ID do job copiado anteriormente.

    • Para cancelar um job em lote ou de streaming:

      gcloud dataflow jobs cancel JOB_ID --force
      

      Substitua JOB_ID pelo ID do job copiado anteriormente.

API

Para cancelar ou drenar um job usando a API REST do Dataflow, escolha projects.locations.jobs.update ou projects.jobs.update. No corpo da solicitação, transmita o estado do job necessário no campo requestedState da instância do job da API escolhida.

Importante:: usar projects.locations.jobs.update é recomendado, já que projects.jobs.update só permite atualizar o estado dos jobs em execução em us-central1.

  • Para cancelar o job, defina o estado dele como JOB_STATE_CANCELLED.

  • Para drenar o job, defina o estado dele como JOB_STATE_DRAINED.

  • Para forçar o cancelamento do job, defina o estado dele como JOB_STATE_CANCELLED com o rótulo "force_cancel_job": "true". O corpo da solicitação é:

    ​​{
      "requestedState": "JOB_STATE_CANCELLED",
      "labels": {
        "force_cancel_job": "true"
      }
    }
    

Detectar a conclusão do job do Dataflow

Para detectar quando o cancelamento ou a drenagem do job foi concluído, use um dos seguintes métodos:

  • Usar um serviço de orquestração de fluxo de trabalho, como o Cloud Composer, para monitorar o job do Dataflow.
  • Execute o pipeline de maneira síncrona, de modo que as tarefas sejam bloqueadas até a conclusão do pipeline. Para mais informações, consulte Como controlar modos de execução em "Como definir opções de pipeline".
  • Use a ferramenta de linha de comando na Google Cloud CLI para pesquisar o status do job. Para ver uma lista de todos os jobs do Dataflow no projeto, execute o seguinte comando no shell ou no terminal:

    gcloud dataflow jobs list
    

    A saída mostra o ID, o nome, o status (STATE) e outras informações do job para cada um. Para mais informações, consulte Como usar a interface de linha de comando do Dataflow.

Arquivar jobs do Dataflow

Quando você arquiva um job do Dataflow, ele é removido da lista de jobs na página Jobs do Dataflow no console. O job é movido para uma lista de jobs arquivados. Só é possível arquivar jobs concluídos, o que inclui jobs nos seguintes estados:

  • JOB_STATE_CANCELLED
  • JOB_STATE_DRAINED
  • JOB_STATE_DONE
  • JOB_STATE_FAILED
  • JOB_STATE_UPDATED

Para mais informações, consulte Detectar a conclusão do job do Dataflow neste documento. Para informações sobre solução de problemas, consulte Erros de jobs do Archive em "Resolver erros do Dataflow".

Todos os jobs concluídos são excluídos após um período de armazenamento de 30 dias.

Arquivar um job

Siga estas etapas para remover um job concluído da lista principal na página Jobs do Dataflow.

Console

  1. No console do Google Cloud, acesse a página Jobs do Dataflow.

    Acessar "Jobs"

    Uma lista de jobs do Dataflow é exibida junto com o status deles.

  2. Selecione um job.

  3. Na página Detalhes do job, clique em Arquivar. Se o job não foi concluído, a opção Arquivar não estará disponível.

API

Para arquivar jobs usando a API, utilize o campo JobMetadata. No campo JobMetadata, para userDisplayProperties, use o par de chave-valor "archived":"true".

A solicitação de API também precisa incluir o parâmetro de consulta updateMask.

curl --request PUT \

"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties._archived" \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  --data
'{"job_metadata":{"userDisplayProperties":{"archived":"true"}}}' \
  --compressed

Substitua:

Acessar e restaurar jobs arquivados

Siga estas etapas para ver jobs arquivados ou restaurar jobs arquivados para a lista principal na página Jobs do Dataflow.

Console

  1. No console do Google Cloud, acesse a página Jobs do Dataflow.

    Acessar "Jobs"

  2. Clique no botão de ativação Arquivados. Será exibida uma lista de jobs arquivados do Dataflow.

  3. Selecione um job.

  4. Para restaurar a lista principal de jobs na página Jobs do Dataflow, na página Detalhes do job, clique em Restaurar.

API

Para restaurar jobs usando a API, use o campo JobMetadata. No campo JobMetadata, para userDisplayProperties, use o par de chave-valor "archived":"false".

A solicitação de API também precisa incluir o parâmetro de consulta updateMask.

curl --request PUT \

"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties._archived" \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  --data
'{"job_metadata":{"userDisplayProperties":{"archived":"false"}}}' \
  --compressed

Substitua:

A seguir