Perguntas frequentes

Na seção a seguir, mostramos respostas para algumas perguntas frequentes sobre o Dataflow.

Perguntas gerais

Onde encontrar mais suporte?

Visite o Suporte do Google Cloud para receber um pacote de suporte do Google Cloud, incluindo o Dataflow.

Use o StackOverflow para pesquisar sua dúvida ou enviar uma nova pergunta. Ao enviá-la, marque a pergunta com google-cloud-dataflow (em inglês). Esse grupo é monitorado por membros da equipe de engenharia do Google que têm o prazer de responder às suas perguntas.

Envie também perguntas, solicitações de recursos, relatórios de bugs ou defeitos e feedback pelo fórum UserVoice.

É possível compartilhar dados entre instâncias de pipelines?

Não há nenhum mecanismo de comunicação cruzada de pipeline específico do Dataflow para compartilhamento de dados ou processamento de contexto entre pipelines. Use um armazenamento durável, como o Cloud Storage, ou um cache na memória, como o App Engine, para compartilhar dados entre instâncias de pipelines.

Existe um mecanismo de programação integrado para executar pipelines em um momento ou intervalo específico?

Para automatizar a execução de pipelines, é possível:

Como saber qual a versão de SDK do Dataflow está instalada/em execução no ambiente?

Os detalhes da instalação dependem do ambiente de desenvolvimento. Se você está usando o Maven, pode ter várias versões de SDK do Dataflow "instaladas" em um ou mais repositórios locais do Maven.

Java

Para descobrir qual versão do SDK do Dataflow um determinado pipeline está executando, observe a saída do console ao executar com DataflowPipelineRunner ou BlockingDataflowPipelineRunner. O console contém uma mensagem como esta, com informações de versão do SDK do Dataflow:

Python

Para descobrir qual versão do SDK do Dataflow um determinado pipeline está executando, é possível analisar a saída do console ao executar com DataflowRunner. O console contém uma mensagem como esta, com informações de versão do SDK do Dataflow:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Como interagir com o job do Cloud Dataflow

É possível acessar as máquinas do worker do meu job (VMs do Compute Engine) enquanto o pipeline está em execução?

É possível visualizar as instâncias de VM para um determinado pipeline usando o Console do Google Cloud. A partir desse ponto, use o SSH para acessar cada instância. No entanto, se seu job falha ou é concluído, o serviço Dataflow é desligado automaticamente e limpa as instâncias de VM.

Por que não consigo ver o tempo de CPU reservado para meu job de streaming na interface de monitoramento do Cloud Dataflow?

O serviço Dataflow relata o tempo de CPU reservado após a conclusão dos jobs. Para jobs ilimitados, isso significa que o tempo de CPU reservado só é informado após o cancelamento ou a falha dos jobs.

Por que as informações sobre estado e marca d'água do job não estão disponíveis nos jobs de streaming atualizados recentemente na interface de monitoramento do Cloud Dataflow?

Várias alterações são feitas pela operação "Atualização" e levam alguns minutos para serem propagadas até a Interface de monitoramento do Dataflow. Tente atualizar a interface de monitoramento cinco minutos depois de atualizar o job.

Por que minhas transformações compostas personalizadas aparecem expandidas na Interface de monitoramento do Dataflow?

No código do pipeline, você pode ter invocado sua transformação composta, da seguinte forma:

result = transform.apply(input);

Transformações compostas invocadas dessa maneira omitem o aninhamento esperado e podem aparecer expandidas na Interface de monitoramento do Dataflow. O pipeline também gera avisos ou erros sobre nomes únicos estáveis no ambiente de execução do pipeline.

Para evitar esses problemas, chame a transformação usando o formato recomendado:

result = input.apply(transform);

Por que não consigo mais ver as informações do meu job em andamento na interface de monitoramento do Cloud Dataflow, se antes elas apareciam?

Há um problema conhecido que pode afetar alguns jobs do Dataflow gerados há um mês ou mais. Esses jobs podem falhar durante o carregamento na Interface de monitoramento do Dataflow ou podem mostrar informações desatualizadas, mesmo que o job tenha ficado visível.

Você ainda consegue consultar o status do job na lista de jobs ao usar o Monitoramento ou as Interfaces de linha de comando do Dataflow. No entanto, quando esse problema ocorre, não é possível visualizar detalhes do job.

Programação com o SDK do Apache Beam para Java

Posso passar outros dados (fora da faixa) para uma operação ParDo atual?

Sim. Há vários padrões que podem ser seguidos, dependendo do caso de uso:

  • É possível serializar informações como campos em uma subclasse DoFn.
  • Todas as variáveis referenciadas pelos métodos em DoFn anônimo são serializadas automaticamente.
  • É possível computar dados dentro de DoFn.startBundle().
  • É possível passar dados por ParDo.withSideInputs.

Para mais informações, consulte a documentação do ParDo (em inglês), em especial as seções sobre como criar um DoFn e sobre entradas secundárias, bem como a documentação de referência da API para Java do ParDo.

Como as exceções Java são processadas no Cloud Dataflow?

O pipeline pode lançar exceções durante o processamento de dados. Alguns desses erros são transitórios, por exemplo, a dificuldade temporária em acessar um serviço externo. Outros são permanentes, como erros causados por dados de entrada corrompidos ou impossíveis de analisar, ou ponteiros nulos durante a computação.

O Dataflow processa elementos em grupos arbitrários e repete todo o grupo quando é gerado um erro para qualquer elemento desse grupo. Na execução no modo em lote, os pacotes que incluem um item com falha são repetidos quatro vezes. A falha do pipeline será total quando um único pacote falhar quatro vezes. Em execuções no modo de streaming, um pacote incluindo um item com falha é repetido indefinidamente, o que pode causar a parada permanente do pipeline.

Exceções no código do usuário, como instâncias DoFn, são relatadas na interface de monitoramento do Dataflow. Ao executar o pipeline com BlockingDataflowPipelineRunner, você também verá mensagens de erro impressas no console ou na janela de terminal.

Proteja o código contra erros adicionando gerenciadores de exceção. Por exemplo, se quiser soltar elementos que falham em alguma validação de entrada personalizada feita em um ParDo, use um bloco try/catch no ParDo para manipular a exceção e soltar o elemento. Para rastrear a contagem de erros, use transformações de agregação.

Programar com o SDK do Cloud Dataflow para Python

Como gerenciar NameErrors?

Se você estiver recebendo um NameError quando executar o pipeline usando o serviço Dataflow, mas não quando o executa localmente (isto é, usando a DirectRunner), as DoFns podem estar usando valores no namespace global que não estão disponíveis no worker do Dataflow.

Por padrão, importações, funções e variáveis globais definidas na sessão principal não são salvas durante a serialização de um job do Dataflow. Se, por exemplo, DoFns estiverem definidos no arquivo principal e nas importações e funções de referência no namespace global, será possível definir a opção de pipeline --save_main_session como True. Isso faz com que o estado do namespace global seja conservado e carregado no trabalhador do Dataflow.

Se houver objetos no namespace global que não podem ser preservados, você receberá um erro de pickling. Se o erro se referir a um módulo que deve estar disponível na distribuição do Python, resolva isso importando o módulo localmente, onde ele é usado.

Por exemplo, em vez de:

import re
…
def myfunc():
  # use re module

use:

def myfunc():
  import re
  # use re module

Como alternativa, se os DoFns abrangerem vários arquivos, use uma abordagem diferente para empacotar o fluxo de trabalho e gerenciar dependências (em inglês).

E/S de pipeline

A fonte e o coletor de TextIO suportam arquivos compactados, como GZip?

Sim. Com o Dataflow Java, é possível ler arquivos compactados com gzip e bzip2. Para saber mais, consulte a documentação de TextIO.

Posso usar uma expressão regular para direcionar arquivos específicos com a fonte TextIO?

O Dataflow é compatível com padrões gerais de caractere curinga. A expressão glob pode aparecer em qualquer lugar no caminho do arquivo. Entretanto, ele não é compatível com caracteres curinga recursivos (**).

A fonte de entrada de TextIO é compatível com JSON?

Sim. No entanto, para que o serviço Dataflow possa carregar em paralelo a entrada e a saída, os dados de origem precisam ser delimitados com um avanço de linha.

Por que o rebalanceamento do trabalho dinâmico não é ativado com a fonte personalizada?

O rebalanceamento de trabalho dinâmico usa o valor de retorno do método getProgress() da fonte personalizada para ser ativado. A implementação padrão para getProgress() retorna null. Para garantir que o escalonamento automático seja ativado, faça com que a fonte personalizada substitua getProgress() para retornar um valor apropriado.

Como acessar conjuntos de dados do BigQuery ou tópicos/assinaturas do Pub/Sub que pertençam a um projeto diferente do Google Cloud Platform, ou seja, que não pertençam ao projeto em uso com o Cloud Dataflow?

Consulte o guia Segurança e permissões do Dataflow para mais informações sobre como acessar dados do BigQuery ou Pub/Sub em um projeto diferente do Google com que você usa o Dataflow.

Por que recebo erros de "rateLimitExceeded" ao usar o conector do BigQuery e o que posso fazer em relação a isso?

O BigQuery tem limites de cota de curto prazo que se aplicam quando muitas solicitações de API são enviadas durante um curto período. É possível que o pipeline do Dataflow exceda temporariamente essa cota. Sempre que isso acontece, as solicitações de API do seu pipeline do Dataflow para o BigQuery podem falhar, o que pode resultar em erros rateLimitExceeded nos registros do worker. O Dataflow repete essas falhas para que seja possível ignorar os erros com segurança. Se você acredita que o pipeline foi afetado significativamente por causa de erros rateLimitExceeded, entre em contato com o suporte do Google Cloud.

Estou usando o conector do BigQuery para gravar no BigQuery usando inserções de streaming e minha capacidade de gravação é menor que o esperado. O que posso fazer para resolver isso?

A capacidade lenta pode ser causada porque o pipeline excede a cota de inserção de streaming do BigQuery disponível. Se esse for o caso, você verá mensagens de erro relacionadas à cota do BigQuery nos registros do worker do Dataflow (procure erros quotaExceeded). Se você encontrar esses erros, configure a opção do coletor do BigQuery ignoreInsertIds() ao usar o SDK do Apache Beam para Java ou usar a opção ignore_insert_ids ao utilizar o SDK do Apache Beam para Python, de forma que se torne automaticamente qualificado para ganhar 1 GB/s por capacidade de inserção de streaming do BigQuery por projeto. Para mais informações sobre ressalvas relacionadas à eliminação de duplicação automática de mensagens, consulte a documentação do BigQuery. Para aumentar a cota de inserção de streaming do BigQuery para mais de 1 GB/s, envie uma solicitação por meio do Console do Cloud.

Se você não vê erros relacionados à cota nos registros do worker, é possível que o agrupamento padrão ou os parâmetros em lote não forneçam o paralelismo adequado para o pipeline. Há várias configurações relacionadas ao conector do BigQuery do Dataflow que podem ser consideradas para alcançar o desempenho esperado ao gravar no BigQuery usando inserções de streaming. Por exemplo, no SDK do Apache Beam para Java, ajuste numStreamingKeys para corresponder ao número máximo de workers e considere aumentar insertBundleParallelism para configurar o conector do BigQuery para gravar no BigQuery usando linhas de execução mais paralelas. Para ver as configurações disponíveis no SDK do Apache Beam para Java, consulte BigQueryPipelineOptions e consulte as configurações disponíveis no SDK do Apache Beam para Python na transformação do WriteToBigQuery.

Streaming

Como executar o pipeline no modo de streaming?

Ao executar o pipeline, configure a sinalização --streaming na linha de comando. Também é possível definir o modo de streaming de maneira programática durante a construção do pipeline.

Que fontes de dados e coletores são compatíveis com o modo de streaming?

É possível ler dados de streaming a partir do Pub/Sub e gravar dados de streaming no Pub/Sub ou no BigQuery.

Quais são as limitações atuais do modo de streaming?

Fontes de lote ainda não são suportadas no modo de streaming.

O canal de streaming que faz leituras no Pub/Sub está lento. O que posso fazer?

O projeto pode ter cota de Pub/Sub insuficiente. Para descobrir se o projeto tem cota insuficiente, verifique erros do cliente 429 (Rate limit exceeded):

  1. Acesse o Console do Google Cloud.
  2. No menu à esquerda, selecione APIs e serviços.
  3. Na caixa de pesquisa, consulte Cloud Pub/Sub.
  4. Clique na guia Uso.
  5. Verifique os Códigos de resposta e procure códigos de erro de cliente (4xx).

Por que o dimensionamento do job de streaming não está sendo feito corretamente ao atualizar o pipeline com um pool maior de workers?

Java

Para jobs de streaming que não usam o Streaming Engine, não é possível escalonar além do número inicial de workers e recursos do Persistent Disk alocados no início do job original. Ao atualizar um job do Dataflow e especificar um número maior de workers no novo job, especifique somente um número de workers igual ao --maxNumWorkers especificado para o job original.

Python

Para jobs de streaming que não usam o Streaming Engine, não é possível escalonar além do número inicial de workers e recursos do Persistent Disk alocados no início do job original. Ao atualizar um job do Dataflow e especificar um número maior de workers no novo job, especifique somente um número de workers igual ao --max_num_workers especificado para o job original.

Escalonamento automático de streaming

Como ter um número fixo de workers?

Para ativar o dimensionamento automático de streaming, é necessário aceitá-lo. Ele não é ativado por padrão. A semântica das opções atuais não muda. Portanto, para manter um número fixo de workers, não é necessário fazer nada.

Receio que o escalonamento automático aumente o valor de minha fatura. Como limitá-lo?

Java

Ao especificar --maxNumWorkers, você limita o intervalo de dimensionamento usado para processar o job.

Python

Ao especificar --max_num_workers, você limita o intervalo de dimensionamento usado para processar o job.

Qual é o intervalo de escalonamento para pipelines de escalonamento automático de streaming?

Java

Para jobs de escalonamento automático de streaming que não usam Streaming Engine, o serviço do Dataflow aloca de 1 a 15 Persistent Disks para cada worker. Isso significa que o número mínimo de workers usados para um pipeline de escalonamento automático de streaming é N/15, em que N é o valor de --maxNumWorkers.

Para jobs de escalonamento automático de streaming que usam o Streaming Engine, o número mínimo de workers é 1.

O Dataflow equilibra o número de discos permanentes entre os workers. Por exemplo, caso o pipeline precise de 3 ou 4 workers estáveis, defina --maxNumWorkers=15. O pipeline escalona automaticamente entre 1 e 15 workers, usando 1, 2, 3, 4, 5, 8 ou 15 workers, que correspondem a 15, 8, 5, 4, 3, 2 ou 1 discos permanentes por worker, respectivamente.

--maxNumWorkers pode ser de, no máximo, 1.000.

Python

Para jobs de escalonamento automático de streaming que não usam Streaming Engine, o serviço do Dataflow aloca de 1 a 15 Persistent Disks para cada worker. Isso significa que o número mínimo de workers usados para um pipeline de escalonamento automático de streaming é N/15, em que N é o valor de --max_num_workers.

Para jobs de escalonamento automático de streaming que usam o Streaming Engine, o número mínimo de workers é 1.

O Dataflow equilibra o número de discos permanentes entre os workers. Por exemplo, caso o pipeline precise de 3 ou 4 workers estáveis, defina --max_num_workers=15. O pipeline escalona automaticamente entre 1 e 15 workers, usando 1, 2, 3, 4, 5, 8 ou 15 workers, que correspondem a 15, 8, 5, 4, 3, 2 ou 1 discos permanentes por worker, respectivamente.

--max_num_workers pode ser de, no máximo, 1.000.

Qual é o número máximo de workers que o escalonamento automático pode usar?

Java

O Cloud Dataflow opera dentro dos limites da cota de contagem da instância do Compute Engine do projeto ou maxNumWorkers, o que for menor.

Python

O Cloud Dataflow opera dentro dos limites da cota de contagem da instância do Compute Engine do projeto ou max_num_workers, o que for menor.

Posso desativar o escalonamento automático no pipeline de streaming?

Java

Sim. Defina --autoscalingAlgorithm=NONE. Atualize o pipeline com especificações de cluster fixas, conforme descrito na documentação de escalonamento manual, em que numWorkers está dentro do intervalo de escalonamento.

Python

Sim. Defina --autoscaling_algorithm=NONE. Atualize o pipeline com especificações de cluster fixas, conforme descrito na documentação de escalonamento manual, em que num_workers está dentro do intervalo de escalonamento.

Posso alterar o intervalo de escalonamento no pipeline de streaming?

Java

Sim, mas isso não pode ser feito com Atualização*. É preciso interromper o pipeline com Cancelar ou Drenar e reimplantá-lo com o novo maxNumWorkers pretendido.

Python

Sim, mas isso não pode ser feito com Atualização*. É preciso interromper o pipeline com Cancelar ou Drenar e reimplantá-lo com o novo max_num_workers pretendido.

Como configurar o projeto do Google Cloud Platform para usar o Cloud Dataflow

Como posso determinar se o projeto que estou usando com o Cloud Dataflow é proprietário do bucket do Cloud Storage em que quero fazer leituras e gravações?

Para determinar se seu projeto do Google Cloud tem um determinado bucket do Cloud Storage, use o seguinte comando do console:

gsutil acl get gs://<your-bucket>

O comando gera uma string JSON semelhante a esta:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

As entradas relevantes são aquelas para as que o "papel" é proprietário. O projectNumber associado informa qual projeto é proprietário desse bucket. Se o número do projeto não corresponde ao número do seu projeto, execute uma das seguintes ações:

  • Crie um novo bucket que seja de propriedade do seu projeto.
  • Conceda o acesso apropriado de contas ao bucket.

Como criar um novo bucket de propriedade do meu projeto Cloud Dataflow?

Para criar um novo bucket no projeto do Google Cloud em que você está usando o Dataflow, use o seguinte comando do console:

gsutil mb -p <Project to own the bucket> <bucket-name>

Como fazer leituras e gravações em um bucket que pertence a um projeto diferente do Google Cloud Platform que estou usando com o Cloud Dataflow?

Consulte o guia Segurança e permissões do Dataflow para saber como o pipeline do Dataflow pode acessar os recursos do Google Cloud pertencentes a outro projeto do Google Cloud.

Quando tento executar meu job do Cloud Dataflow, aparece um erro com a mensagem: "É necessário ativar algumas APIs do Cloud em seu projeto para que o Cloud Dataflow execute esse job". O que eu faço?

Para executar um job do Dataflow, ative as seguintes APIs do Google Cloud no seu projeto:

  • API Compute Engine (Compute Engine)
  • API Cloud Logging
  • Cloud Storage
  • API Cloud Storage JSON
  • API BigQuery
  • Pub/Sub
  • API de armazenamento de dados

Consulte a seção Primeiros passos para ativar as APIs do Google Cloud para ter instruções detalhadas