Desenvolver e testar pipelines do Dataflow

Nesta página, apresentamos as práticas recomendadas para desenvolver e testar seu pipeline do Dataflow.

Visão geral

A forma como o código do pipeline é implementado tem uma influência significativa no desempenho do pipeline na produção. Para ajudar você a criar um código de pipeline que funcione corretamente e com eficiência, este documento explica os seguintes itens:

  • Executores de pipeline que oferecem suporte à execução de código nos diferentes estágios de desenvolvimento e implantação.
  • Ambientes de implantação que permitem executar pipelines durante o desenvolvimento, teste, pré-produção e produção.
  • Código de pipeline de código aberto e modelos prontos para uso ou que podem ser usados como base de novos pipelines para acelerar o desenvolvimento de código.
  • Práticas recomendadas de codificação para desenvolver o pipeline a fim de melhorar a observabilidade e o desempenho do pipeline. Muitas dessas práticas são aplicáveis à programação usando o SDK do Apache Beam (os exemplos se concentram no Java) e não são específicas do Dataflow. No entanto, em muitos casos, o Dataflow fornece recursos que complementam essas práticas de programação para melhorar a prontidão da produção.
  • Uma abordagem de práticas recomendadas para testar o código do pipeline. Primeiro, este documento fornece uma visão geral que inclui o escopo e o relacionamento de diferentes tipos de teste, como testes de unidade, testes de integração e testes completos. Em segundo lugar, cada tipo de teste é explorado em detalhes, incluindo métodos para criar e integrar com dados de teste e quais executores de pipeline usar para cada teste.

Executores de pipelines

Durante o desenvolvimento e os testes, você usa diferentes executores do Apache Beam para gerar o código do pipeline. O SDK do Apache Beam fornece o Direct Runner, um executor direto para desenvolvimento e teste locais. Suas ferramentas de automação de lançamento também podem usar o Direct Runner para testes de unidade e integração. Por exemplo, é possível usar o Direct Runner no pipeline de integração contínua (CI).

Os pipelines implantados no Dataflow usam o Dataflow Runner, que executa o pipeline em ambientes semelhantes a ambientes de produção. Além disso, é possível usar o Dataflow Runner para testes de desenvolvimento ad hoc e testes de pipeline completos.

Embora esta página se concentre na execução de pipelines criados com o SDK do Apache Beam para Java, o Dataflow também oferece suporte aos pipelines do Apache Beam que foram desenvolvidos usando Python e Go. Os SDKs do Apache Beam para Java, Python e Go estão com disponibilidade geral no Dataflow. Os desenvolvedores de SQL também podem usar a SQL do Apache Beam para criar pipelines que usam dialetos de SQL conhecidos.

Configurar um ambiente de implantação

Para separar usuários, dados, códigos e outros recursos em diferentes estágios de desenvolvimento, crie ambientes de implantação. Sempre que possível, para fornecer ambientes isolados nos diferentes estágios de desenvolvimento do pipeline, use projetos do Google Cloud separados.

As seções a seguir descrevem um conjunto típico de ambientes de implantação.

Ambiente local

O ambiente local é a estação de trabalho de um desenvolvedor. Para desenvolvimento e testes rápidos, use o Direct Runner para executar o código do pipeline localmente.

Os pipelines executados localmente usando o Direct Runner podem interagir com recursos remotos do Google Cloud, como tópicos do Pub/Sub ou tabelas do BigQuery. Dê aos desenvolvedores individuais projetos separados do Google Cloud para que eles tenham um sandbox para testes ad hoc com os serviços do Google Cloud.

Alguns serviços do Google Cloud, como o Pub/Sub e o Bigtable, fornecem emuladores para desenvolvimento local. É possível usar esses emuladores com o Direct Runner para ativar o desenvolvimento e o teste locais completos.

Ambiente de sandbox

O ambiente de sandbox é um projeto do Google Cloud que fornece aos desenvolvedores acesso aos serviços do Google Cloud durante o desenvolvimento do código. Os desenvolvedores de pipeline podem compartilhar um projeto do Google Cloud com outros desenvolvedores ou usar os próprios projetos individuais. O uso de projetos individuais reduz a complexidade do planejamento em relação ao uso de recursos compartilhados e ao gerenciamento de cotas.

Os desenvolvedores usam o ambiente de sandbox para a execução ad hoc do pipeline usando o Dataflow Runner. O ambiente sandbox é útil para depurar e testar códigos em um executor de produção durante a fase de desenvolvimento de código. Por exemplo, a execução ad-hoc do pipeline permite que os desenvolvedores:

  • observem o efeito das alterações do código no comportamento de escalonamento;
  • entendam as possíveis diferenças entre o comportamento do Direct Runner e do Dataflow Runner;
  • entendam como o Dataflow aplica as otimizações de gráfico.

Em testes ad hoc, os desenvolvedores podem implantar códigos do ambiente local para executar o Dataflow no ambiente de sandbox.

Ambiente de pré-produção

O ambiente de pré-produção serve para as fases de desenvolvimento que precisam ser executadas em condições de produção, como testes completos. Use um projeto separado para o ambiente de pré-produção e o configure para ser o mais semelhante possível ao de produção. Da mesma forma, para permitir testes completos em escala de produção, torne as cotas de projetos do Google Cloud para o Dataflow e outros serviços o mais semelhante possível ao ambiente de produção.

Dependendo dos requisitos, é possível separar ainda mais a pré-produção em vários ambientes. Por exemplo, um ambiente de controle de qualidade pode dar suporte ao trabalho de analistas de qualidade para testar objetivos de nível de serviço (SLOs), como precisão de dados, atualização e desempenho em diferentes condições de carga de trabalho.

Os testes completos incluem integração com fontes de dados e coletores dentro do escopo dos testes. Considere como disponibilizá-los no ambiente de pré-produção. É possível armazenar dados de teste no próprio ambiente de pré-produção. Por exemplo, os dados de teste são armazenados em um bucket do Cloud Storage com seus dados de entrada. Em outros casos, os dados de teste podem ser originados fora do ambiente de pré-produção, como um tópico do Pub/Sub por meio de uma assinatura separada que está no ambiente de produção. Para pipelines de streaming, também é possível executar testes completos usando dados gerados, por exemplo, com o Gerador de dados de streaming do Dataflow, para emular características e volumes de dados semelhantes à produção.

Para pipelines de streaming, use o ambiente de pré-produção para testar atualizações do pipeline antes que qualquer alteração seja feita na produção. É importante testar e verificar os procedimentos de atualização para pipelines de streaming, especialmente se você precisar coordenar várias etapas, como ao executar pipelines paralelos para evitar inatividade.

Ambiente de produção

O ambiente de produção é um projeto dedicado do Google Cloud. A entrega contínua copia os artefatos de implantação para o ambiente de produção quando todos os testes completos são aprovados.

Práticas recomendadas de desenvolvimento

Esta seção aborda as práticas recomendadas de codificação e desenvolvimento. Muitas dessas práticas complementam e melhoram aspectos do desenvolvimento e da operação do pipeline, como melhorar a produtividade do desenvolvedor, promover a testabilidade do pipeline, aumentar o desempenho e permitir insights mais detalhados com monitoramento.

Antes de iniciar o desenvolvimento, configure ambientes de implantação que ofereçam suporte ao seu ciclo de vida de desenvolvimento, teste e entrega.

Usar modelos fornecidos pelo Google

Para acelerar o desenvolvimento do pipeline, verifique se o Google fornece um modelo do Dataflow. Alguns modelos permitem adicionar lógica personalizada como uma etapa do pipeline. Por exemplo, a assinatura do Pub/Sub para o modelo do BigQuery fornece um parâmetro para executar uma função JavaScript definida pelo usuário (UDF, na sigla em inglês) que é armazenada no Cloud Storage. Os modelos fornecidos pelo Google são de código aberto sob a licença Apache 2.0, para que você possa usá-los como base para novos pipelines. Os modelos também são úteis como exemplos de código para referência.

Criar bibliotecas de transformações reutilizáveis

O modelo de programação do Apache Beam unifica o processamento de dados em lote e streaming, o que possibilita a reutilização de transformações. Criar uma biblioteca compartilhada de transformações comuns promove a reutilização, a capacidade de teste e a propriedade do código por diferentes equipes.

Considere os dois exemplos de código Java a seguir, que leem eventos de pagamento. O primeiro é de uma fonte ilimitada do Pub/Sub:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

O segundo é proveniente de uma fonte limitada de banco de dados relacional:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

Supondo que ambos os pipelines executem o mesmo processamento, eles podem usar as mesmas transformações de uma biblioteca compartilhada para as etapas de processamento restantes. A implementação das práticas recomendadas de reutilização de código varia de acordo com a linguagem de programação e a ferramenta de build. Por exemplo, se você usa o Maven, é possível separar o código de transformação no próprio módulo. Em seguida, você pode incluir o módulo como um submódulo em projetos com vários módulos maiores para pipelines diferentes, conforme mostrado no exemplo de código a seguir:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

Para mais informações, consulte a documentação do Apache Beam para conferir práticas recomendadas na gravação de códigos de usuário para transformações do Apache Beam e o guia de estilo recomendado para PTransforms.

Usar filas de mensagens inativas para tratamento de erros

O pipeline pode encontrar situações em que não é possível processar elementos. Isso pode ocorrer por diferentes motivos, mas uma causa comum são problemas de dados. Por exemplo, um elemento que contém JSON mal formatado pode causar falhas de análise.

Nessa situação, uma abordagem é capturar uma exceção no método DoFn.ProcessElement. No bloco de exceções, registre o erro e remova o elemento. No entanto, isso faz com que os dados sejam perdidos e impede que eles sejam inspecionados posteriormente para processamento manual ou solução de problemas.

Uma abordagem melhor é usar um padrão chamado fila de mensagens inativas (ou arquivo de mensagens inativas). Você captura exceções no método DoFn.ProcessElement e registra erros como faria normalmente. Em vez de remover o elemento com falha, use saídas de ramificação para gravá-lo em um objeto PCollection separado. Esses elementos podem ser gravados em um coletor de dados para inspeção e processamento posteriores usando uma transformação separada.

O exemplo de código Java a seguir mostra como implementar o padrão de fila de mensagens inativas:

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

É possível usar o Cloud Monitoring para aplicar diferentes políticas de monitoramento e alerta na fila de mensagens inativas do pipeline. Por exemplo, é possível visualizar o número e o tamanho dos elementos processados pela transformação de mensagens inativas e configurar os alertas para serem acionados se determinadas condições de limite forem atendidas.

Gerenciar mutações de esquema

É possível processar dados que tenham esquemas inesperados (mas válidos) usando um padrão de mensagens inativas, que grava elementos com falha em um objeto PCollection separado. No entanto, em alguns casos, é melhor processar automaticamente os elementos que refletem um esquema alterado como elementos válidos. Por exemplo, se o esquema de um elemento refletir uma mutação, como a adição de novos campos, será possível adaptar o esquema do coletor de dados para acomodar mutações.

A mutação automática de esquema depende da abordagem de ramificação e da saída usada pelo padrão de mensagens inativas. No entanto, nesse caso, ele aciona uma transformação que modifica o esquema de destino sempre que esquemas aditivos forem encontrados. Para ver um exemplo dessa abordagem, consulte Como processar a modificação de esquemas JSON em um pipeline de streaming, com a Square Enix no blog do Google Cloud.

Escolha corretamente entre entradas secundárias ou CoGroupByKey para junções

A junção de conjuntos de dados é um caso de uso comum para pipelines de dados. As entradas secundárias fornecem uma maneira flexível de resolver problemas comuns de processamento de dados, como enriquecimento de dados e pesquisas com chaves. Ao contrário dos objetos PCollection, as entradas secundárias também são mutáveis e podem ser determinadas no ambiente de execução. Por exemplo, os valores em uma entrada secundária podem ser calculados por outra ramificação no pipeline ou determinados chamando um serviço remoto.

O Dataflow é compatível com entradas secundárias usando a persistência de dados no armazenamento permanente (semelhante a um disco compartilhado), o que disponibiliza a entrada secundária completa para todos os workers. Os tamanhos de entrada secundária podem ser muito grandes e não caberem na memória do worker. A leitura de uma entrada secundária grande pode causar problemas de desempenho se os workers precisarem ler constantemente do armazenamento permanente.

A transformação CoGroupByKey é uma transformação principal do Apache Beam que combina (nivela) vários objetos PCollection e elementos de grupos que tenham uma chave comum. Diferente de uma entrada secundária, que disponibiliza todos os dados de entrada secundária para cada worker, a CoGroupByKey executa uma operação de embaralhamento (agrupamento) para distribuir dados entre os workers. Portanto, CoGroupByKey é ideal quando os objetos PCollection que você quer mesclar são muito grandes e não se encaixam na memória do worker.

Siga estas diretrizes para decidir se quer usar entradas secundárias ou CoGroupByKey:

  • Use entradas secundárias quando um dos objetos PCollection que você está mesclando for desproporcionalmente menor que o outro e o objeto PCollection menor se encaixar na memória do worker. Armazenar toda a entrada secundária em cache na memória torna a busca de elementos rápida e eficiente.
  • Use CoGroupByKey se você precisar buscar uma grande proporção de um objeto PCollection que exceda significativamente a memória do worker.
  • Use entradas secundárias quando tiver um objeto PCollection que deva ser unido várias vezes no pipeline. Em vez de usar várias transformações CoGroupByKey, é possível criar uma única entrada secundária que possa ser reutilizada por várias transformações ParDo.

Para mais informações, consulte Resolver problemas de falta de memória no Dataflow.

Minimizar operações caras por elemento

Uma instância DoFn processa lotes de elementos chamados pacotes (unidades atômicas de trabalho que consistem em zero ou mais elementos). Os elementos individuais são processados pelo método DoFn.ProcessElement, que é executado para cada elemento. Como o método DoFn.ProcessElement é chamado para cada elemento, qualquer operação demorada ou computacionalmente cara invocada por ele faz com que essas operações sejam executadas para cada elemento que ele processa.

Se você precisar realizar operações caras apenas uma vez para um lote de elementos, inclua essas operações nos métodos DoFn.Setup e DoFn.StartBundle em vez de em DoFn.ProcessElement. Os exemplos incluem:

  • Análise de um arquivo de configuração que controla algum aspecto do comportamento da instância DoFn. Invoque essa ação apenas uma vez, quando a instância DoFn for inicializada usando o método DoFn.Setup.

  • Ao instanciar um cliente de curta duração que é reutilizado em todos os elementos de um pacote, por exemplo, quando todos os elementos do pacote são enviados por uma única conexão de rede. Invoque essa ação uma vez por pacote usando o método DoFn.StartBundle.

Limitar os tamanhos de lotes e as chamadas simultâneas a serviços externos

Ao chamar serviços externos, é possível reduzir as despesas por chamada usando a transformação GroupIntoBatches para criar lotes de elementos de um tamanho especificado. O envio em lote envia elementos para um serviço externo como um payload em vez de individualmente.

Em combinação com lotes, é possível limitar o número máximo de chamadas paralelas (simultâneas) ao serviço externo escolhendo as chaves apropriadas para particionar os dados recebidos. O número de partições determina o carregamento em paralelo máximo. Por exemplo, se cada elemento receber a mesma chave, uma transformação downstream para chamar o serviço externo não será executada em paralelo.

Considere uma das seguintes abordagens para produzir chaves para elementos:

  • Escolha um atributo do conjunto de dados para usar como chaves de dados, como IDs de usuário.
  • Gere chaves de dados para dividir elementos de maneira aleatória em um número fixo de partições, em que o número possível de chaves-valor determina o número de partições. Você precisa criar partições suficientes para o paralelismo, e cada partição precisa ter elementos suficientes para que GroupIntoBatches seja útil.

No exemplo de código Java a seguir, mostramos como dividir aleatoriamente elementos em 10 partições:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Identificar problemas de desempenho causados por etapas com a combinação inadequada

O Dataflow cria um gráfico de etapas que representa o pipeline, com base nas transformações e nos dados usados para criá-lo. Isso é chamado de gráfico de execução de pipeline.

Quando você implanta o pipeline, o Dataflow pode modificar o gráfico de execução do pipeline para melhorar o desempenho. Por exemplo, o Dataflow pode unir algumas operações, um processo conhecido como otimização de fusão, para evitar o impacto de desempenho e custo de cada gravação de objeto intermediário PCollection no pipeline.

Em alguns casos, o Dataflow pode determinar incorretamente a melhor forma de fundir operações no pipeline, o que pode limitar a capacidade do serviço do Dataflow de usar todos os workers disponíveis. Nesses casos, é possível evitar que algumas operações sejam mescladas.

Considere o exemplo de código do Apache Beam a seguir. Uma transformação GenerateSequence cria um pequeno objeto PCollection limitado, que depois será processado por duas transformações ParDo downstream.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

A transformação Find Primes Less-than-N pode ser cara em termos de computação e provavelmente será executada lentamente para números grandes. Por outro lado, espera-se que a transformação Increment Number seja concluída rapidamente.

O diagrama a seguir mostra uma representação gráfica do pipeline na interface de monitoramento do Dataflow.

Representação do fluxo do pipeline na interface do Dataflow.

O monitoramento do job usando a interface de monitoramento do Dataflow mostra a mesma taxa de processamento lenta para as duas transformações, ou seja, 13 elementos por segundo. Espera-se que a transformação Increment Number processe elementos rapidamente, mas parece que ela está vinculada à mesma taxa de processamento que Find Primes Less-than-N.

O motivo é que o Dataflow mesclava as etapas em uma única fase, o que as impedia de serem executadas de maneira independente. É possível usar o seguinte comando gcloud:

gcloud dataflow jobs describe --full job-id --format json

Na saída resultante, as etapas combinadas são descritas no objeto ExecutionStageSummary na matriz ComponentTransform :

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

Nesse cenário, a transformação Find Primes Less-than-N é a etapa lenta. Portanto, a quebra da mesclagem antes dessa etapa é uma estratégia apropriada. Um método para separar as etapas é inserir uma transformação GroupByKey e realizar o desagrupamento antes da etapa, conforme mostrado no exemplo de código Java a seguir:

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

Também é possível combinar essas etapas em uma transformação composta reutilizável.

Depois de separar a etapa, ao executar o pipeline, você verá que Increment Number será concluído em segundos e que a transformação Find Primes Less-than-N, muito mais longa, será executada em uma etapa separada.

Este exemplo aplica uma operação de agrupamento e desagrupamento para propagar as etapas. É possível usar outras abordagens para outras circunstâncias. Nesse caso, o processamento da saída duplicada não é um problema, dada a saída consecutiva da transformação GenerateSequence. Objetos KV com chaves duplicadas são têm a duplicação eliminada para uma única chave nas transformações de agrupamento (GroupByKey) e de separação (Keys). Para reter cópias após as operações de agrupamento e desagrupamento, é possível criar pares KV usando uma chave aleatória e a entrada original como o valor, realizar o agrupamento usando a chave aleatória e emitir os valores de cada chave como saída.

Usar métricas do Apache Beam para coletar insights do pipeline

As métricas do Apache Beam são uma classe de utilitários que produzem várias métricas e geram relatórios das propriedades de um pipeline em execução. Quando você usa o Cloud Monitoring, as métricas do Apache Beam ficam disponíveis como métricas personalizadas do Cloud Monitoring.

O snippet Java a seguir é um exemplo de métricas Counter usadas em uma subclasse DoFn.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

O código de exemplo usa dois contadores. Um contador rastreia falhas de análise JSON (malformedCounter) e o outro monitora se a mensagem JSON é válida, mas contém um payload vazio (emptyCounter). No Cloud Monitoring, os nomes das métricas personalizadas são custom.googleapis.com/dataflow/malformedJson e custom.googleapis.com/dataflow/emptyPayload. Use as métricas personalizadas para criar visualizações e políticas de alertas no Cloud Monitoring.

Testar seu pipeline

No desenvolvimento de software, testes de unidade, testes de integração e testes completos são tipos comuns de teste de software. Esses tipos de teste também são aplicáveis a pipelines de dados.

O SDK do Apache Beam fornece funcionalidades para ativar esses testes. O ideal é que cada tipo de teste seja direcionado a um ambiente de implantação diferente. O diagrama a seguir ilustra como os testes de unidade, os testes de integração e os testes completos se aplicam a diferentes partes do pipeline e dos dados.

Tipos de teste e como eles se relacionam com transformações, pipelines, fontes de dados e coletores de dados.

O diagrama mostra o escopo de diferentes testes e como eles se relacionam com transformações (subclasses DoFn e PTransform), pipelines, fontes de dados e coletores de dados.

As seções a seguir descrevem como vários testes formais de software se aplicam a pipelines de dados usando o Dataflow. Ao continuar a leitura desta seção, consulte o diagrama acima para entender como os diferentes tipos de teste estão relacionados.

Amostragem de dados

Para observar os dados em cada etapa de um pipeline do Dataflow, ative a amostragem de dados durante o teste. Isso permite visualizar as saídas das transformações para garantir que estejam corretas.

Testes de unidade

Os testes de unidade avaliam o funcionamento correto de subclasses DoFn e de transformações compostas (subclasse PTransform) comparando a saída dessas transformações com um conjunto verificado de entradas e saídas de dados. Normalmente, os desenvolvedores podem executar esses testes no ambiente local. Eles também podem ser executados automaticamente na automação de teste de unidade usando integração contínua (CI) no ambiente de criação.

O Direct Runner executa testes de unidade com um subconjunto menor de dados de teste de referência que se concentram em testar a lógica de negócios das transformações. Os dados de teste precisam ser pequenos o suficiente para caber na memória local da máquina que executa o teste.

O SDK do Apache Beam fornece uma regra JUnit chamada TestPipeline para transformações individuais de teste de unidade (subclasses DoFn), transformações compostas (subclasses PTransform) e pipelines inteiros. É possível usar TestPipeline em um executor de pipeline do Apache Beam, como o Direct Runner ou o Dataflow Runner, para aplicar declarações no conteúdo de objetos PCollection usando PAssert, conforme mostrado no snippet de código a seguir de uma classe de teste JUnit:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

Testes de unidade para transformações individuais

Ao fatorar o código em transformações reutilizáveis (por exemplo, como classes aninhadas de nível superior ou estáticas), é possível criar testes segmentados para diferentes partes do pipeline. Além dos benefícios do teste, as transformações reutilizáveis promovem a manutenção e a reutilização de códigos, encapsulando a lógica de negócios do pipeline naturalmente em partes de componentes. Por outro lado, testar partes individuais do pipeline pode ser difícil quando o pipeline usa classes internas anônimas para implementar transformações.

O snippet Java a seguir mostra a implementação de transformações como classes internas anônimas, o que não permite testes facilmente.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Compare o exemplo anterior com o seguinte, em que as classes internas anônimas foram refatoradas em subclasses DoFn concretas. É possível criar testes de unidade individuais para cada subclasse DoFn concreta que compõe o pipeline completo.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

O teste de cada subclasse DoFn é semelhante ao teste de unidade de um pipeline em lote que contém uma única transformação. Use a transformação Create para criar um objeto PCollection de dados de teste e transmita-o para o objeto DoFn. Use PAssert para declarar que o conteúdo do objeto PCollection está correto. O exemplo de código Java a seguir usa a classe PAssert para verificar o formato de saída correto.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Testes de integração

Os testes de integração verificam o funcionamento correto de todo o pipeline. Considere os seguintes tipos de teste de integração:

  • Um teste de integração de transformação que avalia a funcionalidade integrada de todas as transformações individuais que compõem o pipeline de dados. Pense nos testes de integração de transformação como um teste de unidade para todo o pipeline, exceto a integração com coletores e fontes de dados externas. O SDK do Beam fornece métodos para fornecer dados de teste ao pipeline de dados e para verificar os resultados do processamento. O Direct Runner é usado para executar testes de integração de transformação.
  • Um teste de integração do sistema que avalia a integração do pipeline de dados com coletores e fontes de dados em tempo real. Para que o pipeline se comunique com sistemas externos, você precisa configurar os testes com as credenciais apropriadas para acessar serviços externos. Os pipelines de streaming são executados indefinidamente. Portanto, você precisa decidir quando e como interromper o pipeline em execução. Ao usar o Direct Runner para executar testes de integração do sistema, você verifica rapidamente a integração entre o pipeline e outros sistemas sem precisar enviar um job do Dataflow e aguardar a conclusão dele.

Projete testes de transformação e integração de sistemas para fornecer detecção de defeitos e feedback rápidos sem atrasar a produtividade do desenvolvedor. Para testes de longa duração, como os executados como jobs do Dataflow, use um teste completo que seja executado com menos frequência.

Pense em um pipeline de dados como uma ou mais transformações relacionadas. Portanto, é possível criar uma transformação composta de encapsulamento para o pipeline e usar TestPipeline para realizar um teste de integração de todo o pipeline. De acordo com sua preferência quanto ao teste do pipeline no modo em lote ou em streaming, forneça dados de teste usando as transformações Create ou TestStream.

Usar dados de teste para testes de integração

No ambiente de produção, o pipeline provavelmente se integra a diferentes fontes de dados e coletores. No entanto, para testes de unidade e testes de integração de transformação, concentre-se na verificação da lógica de negócios do código do pipeline, fornecendo entradas de teste e verificando a saída diretamente. Além de simplificar os testes, essa abordagem permite isolar problemas específicos do pipeline daqueles que podem ser causados por fontes de dados e coletores.

Testar pipelines em lote

Para pipelines em lote, use a transformação Create para criar um objeto PCollection dos dados de teste de entrada com base em uma coleção padrão na memória, como um objeto Java List. Usar a transformação Create é apropriado quando os dados de teste são pequenos o suficiente para serem incluídos no código. Em seguida, use PAssert nos objetos PCollection de saída para determinar a exatidão do código do pipeline. Essa abordagem é compatível com o Direct Runner e com o Dataflow Runner.

O snippet de código Java a seguir mostra declarações sobre objetos PCollection de saída de uma transformação composta que inclui algumas ou todas as transformações individuais que compõem um pipeline (WeatherStatsPipeline). A abordagem é semelhante ao teste de unidade de transformações individuais em um pipeline.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

Para testar o comportamento de gestão de janelas, também é possível usar a transformação Create para criar elementos com carimbos de data/hora, conforme mostrado no snippet de código a seguir:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Testar pipelines de streaming

Os pipelines de streaming contêm suposições que definem como processar dados ilimitados. Essas suposições geralmente são sobre a pontualidade dos dados em condições reais e, portanto, têm impacto na correção, dependendo se as suposições são verdadeiras ou falsas. O ideal é que os testes de integração para pipelines de streaming incluam testes que simulem a natureza não determinista da chegada de dados de streaming.

Para ativar esses testes, o SDK do Apache Beam fornece a classe TestStream a fim de modelar os efeitos da velocidade do elemento (dados antecipados, pontuais ou atrasados) nos resultados do pipeline de dados. Use esses testes com a classe PAssert para verificar os resultados esperados.

TestStream é compatível com o Direct Runner e o Dataflow Runner. A amostra de código a seguir cria uma transformação TestStream:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

Para mais informações sobre TestStream, consulte Como testar pipelines ilimitados no Apache Beam. Para mais informações sobre como usar o SDK do Apache Beam para testes de unidade, consulte a documentação do Apache Beam.

Usar os serviços do Google Cloud em testes de integração

O Direct Runner pode ser integrado aos serviços do Google Cloud para que testes ad hoc no ambiente local e testes de integração do sistema usem o Pub/Sub, o BigQuery e outros serviços conforme necessário. Quando você usa o Direct Runner, o pipeline é executado como a conta de usuário configurada usando a ferramenta de linha de comando gcloud ou como uma conta de serviço que você especificou usando a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS. Portanto, é preciso conceder permissões suficientes a essa conta para todos os recursos necessários antes de executar o pipeline. Para mais informações, consulte Segurança e permissões do Dataflow.

Para testes de integração totalmente locais, é possível usar emuladores locais para alguns serviços do Google Cloud. Os emuladores locais estão disponíveis para Pub/Sub e Bigtable.

Para testes de integração do sistema de pipelines de streaming, use o método setBlockOnRun (definido na interface DirectOptions) para que o Direct Runner execute o pipeline de maneira assíncrona. Caso contrário, a execução do pipeline bloqueará o processo pai de chamada (por exemplo, um script no pipeline de build) até que o pipeline seja interrompido manualmente. Se você executar o pipeline de maneira assíncrona, use a instância PipelineResult retornada para cancelar a execução do pipeline, conforme mostrado no exemplo de código a seguir:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

Testes completos

Testes completos verificam a operação correta do pipeline completo executando-o no Dataflow Runner em condições semelhantes à produção. Os testes verificam se a lógica de negócios funciona corretamente usando o Dataflow Runner e se o pipeline tem o desempenho esperado em cargas semelhantes às de produção. Normalmente, os testes completos são executados em um projeto dedicado do Google Cloud que foi designado como o ambiente de pré-produção.

Para testar o pipeline em diferentes escalas, use diferentes tipos de testes completos, por exemplo:

  • Execute testes completos de pequena escala com uma pequena proporção (como 1%) do conjunto de dados de teste para validar rapidamente a funcionalidade do pipeline no ambiente de pré-produção.
  • Execute testes completos em um conjunto de dados de teste completo para validar a funcionalidade do pipeline em volumes e condições de dados semelhantes à de produção.

Para pipelines de streaming, recomendamos que você execute pipelines de teste em paralelo com o pipeline de produção se eles puderem usar os mesmos dados. Isso permite comparar os resultados e o comportamento operacional, como escalonamento automático e desempenho.

Os testes completos ajudam a prever como o pipeline atenderá aos SLOs de produção. O ambiente de pré-produção testa o pipeline em condições semelhantes à produção. Em testes completos, os pipelines são executados usando o Dataflow Runner para processar conjuntos de dados de referência completos que correspondem (ou são muito semelhantes a) conjuntos de dados na produção.

Talvez não seja possível gerar dados sintéticos para testes que simulem com precisão dados reais. Para resolver esse problema, uma abordagem é usar extrações limpas de fontes de dados de produção para criar conjuntos de dados de referência, em que todos os dados confidenciais são desidentificados por meio de transformações apropriadas. Recomendamos usar a Proteção de dados sensíveis para essa finalidade. A Proteção de dados sensíveis pode detectar dados sensíveis em vários tipos de conteúdo e fontes de dados e aplicar várias técnicas de desidentificação, incluindo encobrimento, mascaramento, criptografia de preservação de formato e mudança de data.

Diferenças em testes completos para pipelines em lote e de streaming

Antes de executar um teste completo em um grande conjunto de dados de teste, faça um teste com uma porcentagem menor desses dados, por exemplo, um por cento, e verifique o comportamento esperado em um tempo mais curto. Assim como nos testes de integração que usam o Direct Runner, é possível usar PAssert em objetos PCollection ao executar pipelines usando o Dataflow Runner. Para saber mais sobre PAssert, consulte a seção Testes de unidade nesta página.

Dependendo do caso de uso, a verificação de saídas muito grandes de testes completos pode ser inviável, cara ou complicada. Nesse caso, é possível verificar amostras representativas do conjunto de resultados de saída. Por exemplo, é possível usar o BigQuery para testar e comparar linhas de saída a um conjunto de dados de referência dos resultados esperados.

Para pipelines de streaming, a simulação de condições realistas de streaming com dados sintéticos pode complicada. Uma maneira comum de fornecer dados de streaming para testes completos é integrar testes a fontes de dados de produção. Se você estiver usando o Pub/Sub como fonte de dados, poderá ativar um fluxo de dados separado para testes completos usando assinaturas adicionais de tópicos existentes. Em seguida, é possível comparar os resultados de pipelines diferentes que consomem os mesmos dados, o que é útil para comparar pipelines candidatos a outros pipelines de pré-produção e produção.

O diagrama a seguir mostra como esse método permite que um pipeline de produção e um pipeline de teste sejam executados em paralelo em diferentes ambientes de implantação.

Executar um pipeline de teste em paralelo a um pipeline de produção usando uma única origem de streaming do Pub/Sub.

No diagrama, os dois pipelines leem o mesmo tópico do Pub/Sub, mas usam assinaturas separadas. Isso permite que os dois pipelines processem os mesmos dados de maneira independente e permite comparar os resultados. O pipeline de teste usa uma conta de serviço separada do projeto de produção e, portanto, evita o uso da cota de assinante do Pub/Sub no projeto de produção.

Ao contrário dos pipelines em lote, os pipelines de streaming continuam em execução até serem cancelados explicitamente. Nos testes completos, você precisa decidir se quer deixar o pipeline em execução, talvez até a execução do próximo teste completo, ou cancelar o pipeline em um momento que represente a conclusão do teste. Assim, é possível inspecionar os resultados.

O tipo de dados de teste que você usa influencia essa decisão. Por exemplo, se você usar um conjunto vinculado de dados de teste fornecido ao pipeline de streaming, poderá cancelar o pipeline quando todos os elementos tiverem concluído o processamento. Como alternativa, se você usar uma fonte de dados real (como um tópico do Pub/Sub usado na produção) ou gerar dados de teste continuamente, é preciso manter os pipelines de teste em execução por um período maior. A última opção permite comparar o comportamento com o ambiente de produção ou até mesmo com outros pipelines de teste.