Implemente pipelines do Dataflow

Este documento fornece uma vista geral da implementação de pipelines e realça algumas das operações que pode realizar num pipeline implementado.

Execute o pipeline

Depois de criar e testar o pipeline do Apache Beam, execute-o. Pode executar o seu pipeline localmente, o que lhe permite testar e depurar o seu pipeline Apache Beam, ou no Dataflow, um sistema de processamento de dados disponível para executar pipelines Apache Beam.

Executar localmente

Execute o pipeline localmente.

Java

O seguinte exemplo de código, retirado do início rápido, mostra como executar o pipeline WordCount localmente. Para saber mais, veja como executar o pipeline Java localmente.

No terminal, execute o seguinte comando:

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

O seguinte exemplo de código, retirado do início rápido, mostra como executar o pipeline WordCount localmente. Para saber mais, veja como executar o pipeline Python localmente.

No terminal, execute o seguinte comando:

python -m apache_beam.examples.wordcount \ --output outputs

Go

O seguinte exemplo de código, retirado do início rápido, mostra como executar o pipeline WordCount localmente. Para saber mais, veja como executar o pipeline do Go localmente.

No terminal, execute o seguinte comando:

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

Saiba como executar o pipeline localmente, no seu computador, usando o executador direto.

Execução no Dataflow

Execute o pipeline no Dataflow.

Java

O seguinte exemplo de código, retirado do início rápido, mostra como executar o pipeline WordCount no Dataflow. Para saber mais, veja como executar o seu pipeline Java no Dataflow.

No terminal, execute o seguinte comando (a partir do diretório word-count-beam):

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

Substitua o seguinte:

  • PROJECT_ID: o ID do seu Google Cloud projeto
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage
  • REGION: uma região do Dataflow, como us-central1

Python

O seguinte exemplo de código, retirado do início rápido, mostra como executar o pipeline WordCount no Dataflow. Para saber mais, veja como executar o seu pipeline Python no Dataflow.

No terminal, execute o seguinte comando:

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

Substitua o seguinte:

  • DATAFLOW_REGION: a região onde quer implementar a tarefa do Dataflow, por exemplo, europe-west1

    A flag --region substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

  • STORAGE_BUCKET: o nome do Cloud Storage que copiou anteriormente
  • PROJECT_ID: o Google Cloud ID do projeto que copiou anteriormente

Go

O seguinte exemplo de código, retirado do início rápido, mostra como executar o pipeline WordCount no Dataflow. Para saber mais, veja como executar o seu pipeline Go no Dataflow.

No terminal, execute o seguinte comando:

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

Substitua o seguinte:

  • STORAGE_BUCKET: o nome do contentor do Cloud Storage.
  • PROJECT_ID: o Google Cloud ID do projeto.
  • DATAFLOW_REGION: A região onde quer implementar a tarefa do Dataflow. Por exemplo, europe-west1. Para ver uma lista de localizações disponíveis, consulte o artigo Localizações do Dataflow. Tenha em atenção que a flag --region substitui a região predefinida definida no servidor de metadados, no cliente local ou nas variáveis de ambiente.

Saiba como executar o seu pipeline no serviço Dataflow, usando o executor do Dataflow.

Quando executa o pipeline no Dataflow, o Dataflow transforma o código do pipeline do Apache Beam num trabalho do Dataflow. O Dataflow gere totalmente os serviços da Google Cloud Platform por si, como o Compute Engine e o Cloud Storage, para executar a sua tarefa do Dataflow e inicia e termina automaticamente os recursos necessários. Pode saber mais sobre como o Dataflow transforma o seu código Apache Beam num trabalho do Dataflow em Ciclo de vida do pipeline.

Validação de pipeline

Quando executa o pipeline no Dataflow, antes do lançamento da tarefa, o Dataflow executa testes de validação no pipeline. Quando um teste de validação encontra problemas com o pipeline, o Dataflow falha o envio do trabalho antecipadamente. Nos registos de tarefas, o Dataflow inclui mensagens com o seguinte texto. Cada mensagem também inclui detalhes sobre as conclusões da validação e instruções para resolver o problema.

The preflight pipeline validation failed for job JOB_ID.

Os testes de validação executados dependem dos recursos e dos serviços que a sua tarefa do Dataflow usa.

  • Se a API Service Usage estiver ativada para o seu projeto, os testes de validação do pipeline verificam se os serviços necessários para executar a tarefa do Dataflow estão ativados.
  • Se a API Cloud Resource Manager estiver ativada para o seu projeto, os testes de validação do pipeline verificam se tem as configurações ao nível do projeto necessárias para executar o seu trabalho do Dataflow.

Para mais informações sobre a ativação de serviços, consulte o artigo Ativar e desativar serviços.

Para obter informações sobre como resolver problemas de autorização detetados durante a validação do pipeline, consulte o artigo Falha na validação do pipeline.

Se quiser substituir a validação do pipeline e iniciar a tarefa com erros de validação, use a seguinte opção de serviço do pipeline:

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Go

--dataflow_service_options=enable_preflight_validation=false

Defina opções de tubagens

Pode controlar alguns aspetos da forma como o Dataflow executa a sua tarefa definindo opções de pipeline no seu código de pipeline do Apache Beam. Por exemplo, pode usar opções de pipeline para definir se o pipeline é executado em máquinas virtuais de trabalho, no back-end do serviço Dataflow ou localmente.

Faça a gestão das dependências do pipeline

Muitos pipelines do Apache Beam podem ser executados com os ambientes de execução do Dataflow predefinidos. No entanto, alguns exemplos de utilização do processamento de dados beneficiam da utilização de bibliotecas ou classes adicionais. Nestes casos, pode ter de gerir as dependências do pipeline. Para mais informações sobre a gestão de dependências, consulte o artigo Faça a gestão das dependências de pipelines no Dataflow.

Monitorize o seu trabalho

O Dataflow oferece visibilidade dos seus trabalhos através de ferramentas como a interface de monitorização do Dataflow e a interface de linha de comandos do Dataflow.

Aceda a VMs de trabalho

Pode ver as instâncias de VM de um determinado pipeline através da Google Cloud consola. A partir daí, pode usar o SSH para aceder a cada instância. No entanto, depois de a tarefa ser concluída ou falhar, o serviço Dataflow é encerrado automaticamente e limpa as instâncias de VM.

Otimizações de tarefas

Além de gerir os Google Cloud recursos, o Dataflow realiza e otimiza automaticamente muitos aspetos do processamento paralelo distribuído por si.

Paralelização e distribuição

O Dataflow particiona automaticamente os seus dados e distribui o código do trabalhador para instâncias do Compute Engine para processamento paralelo. Para mais informações, consulte o artigo sobre paralelização e distribuição.

Otimizações de fusão e combinação

O Dataflow usa o código do pipeline para criar um gráfico de execução que representa as PCollections e as transformações do pipeline, e otimiza o gráfico para o desempenho e a utilização de recursos mais eficientes. O Dataflow também otimiza automaticamente operações potencialmente dispendiosas, como agregações de dados. Para mais informações, consulte os artigos Otimização da união e Otimização da combinação.

Funcionalidades de sintonização automática

O serviço Dataflow inclui várias funcionalidades que permitem o ajuste imediato da atribuição de recursos e da partição de dados. Estas funcionalidades ajudam o Dataflow a executar o seu trabalho o mais rapidamente e eficientemente possível. Estas funcionalidades incluem o seguinte:

Streaming Engine

Por predefinição, o executor de pipelines do Dataflow executa os passos do seu pipeline de streaming inteiramente em máquinas virtuais de trabalho, consumindo a CPU, a memória e o armazenamento do disco persistente do trabalhador. O Streaming Engine do Dataflow move a execução do pipeline das VMs de trabalho para o back-end do serviço Dataflow. Para mais informações, consulte o artigo Streaming Engine.

Programação flexível de recursos do Dataflow

O FlexRS do Dataflow reduz os custos de processamento em lote através da utilização de técnicas de agendamento avançadas, do serviço Dataflow Shuffle e de uma combinação de instâncias de máquinas virtuais (VM) antecipáveis e VMs normais. Ao executar VMs preemptíveis e VMs normais em paralelo, o Dataflow melhora a experiência do utilizador se o Compute Engine parar instâncias de VMs preemptíveis durante um evento do sistema. O FlexRS ajuda a garantir que o pipeline continua a progredir e que não perde o trabalho anterior quando o Compute Engine antecipa as suas VMs antecipáveis. Para mais informações sobre o FlexRS, consulte o artigo Usar o agendamento flexível de recursos no Dataflow.

VM protegida do Dataflow

A partir de 1 de junho de 2022, o serviço Dataflow usa a VM protegida para todos os trabalhadores. Para saber mais acerca das capacidades da VM protegida, consulte o artigo VM protegida.