Desenvolver e testar pipelines do Dataflow

Nesta página, apresentamos as práticas recomendadas para desenvolver e testar seu pipeline do Dataflow.

Visão geral

A forma como o código do pipeline é implementado tem uma influência significativa no desempenho do pipeline na produção. Para ajudar você a criar um código de pipeline que funcione corretamente e com eficiência, este documento explica os seguintes itens:

  • Executores de pipeline que oferecem suporte à execução de código nos diferentes estágios de desenvolvimento e implantação.
  • Ambientes de implantação que permitem executar pipelines durante o desenvolvimento, teste, pré-produção e produção.
  • Código de pipeline de código aberto e modelos prontos para uso ou que podem ser usados como base de novos pipelines para acelerar o desenvolvimento de código.
  • Uma abordagem de práticas recomendadas para testar o código do pipeline. Primeiro, este documento fornece uma visão geral que inclui o escopo e o relacionamento de diferentes tipos de teste, como testes de unidade, testes de integração e testes completos. Em segundo lugar, cada tipo de teste é explorado em detalhes, incluindo métodos para criar e integrar com dados de teste e quais executores de pipeline usar para cada teste.

Executores de pipelines

Durante o desenvolvimento e os testes, você usa diferentes executores do Apache Beam para gerar o código do pipeline. O SDK do Apache Beam fornece o Direct Runner, um executor direto para desenvolvimento e teste locais. Suas ferramentas de automação de lançamento também podem usar o Direct Runner para testes de unidade e integração. Por exemplo, é possível usar o Direct Runner no pipeline de integração contínua (CI).

Os pipelines implantados no Dataflow usam o Dataflow Runner, que executa o pipeline em ambientes semelhantes a ambientes de produção. Além disso, é possível usar o Dataflow Runner para testes de desenvolvimento ad hoc e testes de pipeline completos.

Embora esta página se concentre na execução de pipelines criados com o SDK do Apache Beam para Java, o Dataflow também oferece suporte aos pipelines do Apache Beam que foram desenvolvidos usando Python e Go. Os SDKs do Apache Beam para Java, Python e Go estão com disponibilidade geral no Dataflow. Os desenvolvedores de SQL também podem usar a SQL do Apache Beam para criar pipelines que usam dialetos de SQL conhecidos.

Configurar um ambiente de implantação

Para separar usuários, dados, códigos e outros recursos em diferentes estágios de desenvolvimento, crie ambientes de implantação. Sempre que possível, para fornecer ambientes isolados nos diferentes estágios de desenvolvimento do pipeline, use projetos do Google Cloud separados.

As seções a seguir descrevem um conjunto típico de ambientes de implantação.

Ambiente local

O ambiente local é a estação de trabalho de um desenvolvedor. Para desenvolvimento e testes rápidos, use o Direct Runner para executar o código do pipeline localmente.

Os pipelines executados localmente usando o Direct Runner podem interagir com recursos remotos do Google Cloud, como tópicos do Pub/Sub ou tabelas do BigQuery. Dê aos desenvolvedores individuais projetos separados do Google Cloud para que eles tenham um sandbox para testes ad hoc com os serviços do Google Cloud.

Alguns serviços do Google Cloud, como o Pub/Sub e o Bigtable, fornecem emuladores para desenvolvimento local. É possível usar esses emuladores com o Direct Runner para ativar o desenvolvimento e o teste locais completos.

Ambiente de sandbox

O ambiente de sandbox é um projeto do Google Cloud que fornece aos desenvolvedores acesso aos serviços do Google Cloud durante o desenvolvimento do código. Os desenvolvedores de pipeline podem compartilhar um projeto do Google Cloud com outros desenvolvedores ou usar os próprios projetos individuais. O uso de projetos individuais reduz a complexidade do planejamento em relação ao uso de recursos compartilhados e ao gerenciamento de cotas.

Os desenvolvedores usam o ambiente de sandbox para a execução ad hoc do pipeline usando o Dataflow Runner. O ambiente sandbox é útil para depurar e testar códigos em um executor de produção durante a fase de desenvolvimento de código. Por exemplo, a execução ad-hoc do pipeline permite que os desenvolvedores:

  • observem o efeito das alterações do código no comportamento de escalonamento;
  • entendam as possíveis diferenças entre o comportamento do Direct Runner e do Dataflow Runner;
  • entendam como o Dataflow aplica as otimizações de gráfico.

Em testes ad hoc, os desenvolvedores podem implantar códigos do ambiente local para executar o Dataflow no ambiente de sandbox.

Ambiente de pré-produção

O ambiente de pré-produção serve para as fases de desenvolvimento que precisam ser executadas em condições de produção, como testes completos. Use um projeto separado para o ambiente de pré-produção e o configure para ser o mais semelhante possível ao de produção. Da mesma forma, para permitir testes completos em escala de produção, torne as cotas de projetos do Google Cloud para o Dataflow e outros serviços o mais semelhante possível ao ambiente de produção.

Dependendo dos requisitos, é possível separar ainda mais a pré-produção em vários ambientes. Por exemplo, um ambiente de controle de qualidade pode dar suporte ao trabalho de analistas de qualidade para testar objetivos de nível de serviço (SLOs), como precisão de dados, atualização e desempenho em diferentes condições de carga de trabalho.

Os testes completos incluem integração com fontes de dados e coletores dentro do escopo dos testes. Considere como disponibilizá-los no ambiente de pré-produção. É possível armazenar dados de teste no próprio ambiente de pré-produção. Por exemplo, os dados de teste são armazenados em um bucket do Cloud Storage com seus dados de entrada. Em outros casos, os dados de teste podem ser originados fora do ambiente de pré-produção, como um tópico do Pub/Sub por meio de uma assinatura separada que está no ambiente de produção. Para pipelines de streaming, também é possível executar testes completos usando dados gerados, por exemplo, com o Gerador de dados de streaming do Dataflow, para emular características e volumes de dados semelhantes à produção.

Para pipelines de streaming, use o ambiente de pré-produção para testar atualizações do pipeline antes que qualquer alteração seja feita na produção. É importante testar e verificar os procedimentos de atualização para pipelines de streaming, especialmente se você precisar coordenar várias etapas, como ao executar pipelines paralelos para evitar inatividade.

Ambiente de produção

O ambiente de produção é um projeto dedicado do Google Cloud. A entrega contínua copia os artefatos de implantação para o ambiente de produção quando todos os testes completos são aprovados.

Práticas recomendadas de desenvolvimento

Consulte as Práticas recomendadas para pipelines do Dataflow.

Teste o pipeline

No desenvolvimento de software, testes de unidade, testes de integração e testes completos são tipos comuns de teste de software. Esses tipos de teste também são aplicáveis a pipelines de dados.

O SDK do Apache Beam fornece funcionalidades para ativar esses testes. O ideal é que cada tipo de teste seja direcionado a um ambiente de implantação diferente. O diagrama a seguir ilustra como os testes de unidade, os testes de integração e os testes completos se aplicam a diferentes partes do pipeline e dos dados.

Tipos de teste e como eles se relacionam com transformações, pipelines, fontes de dados e coletores de dados.

O diagrama mostra o escopo de diferentes testes e como eles se relacionam com transformações (subclasses DoFn e PTransform), pipelines, fontes de dados e coletores de dados.

As seções a seguir descrevem como vários testes formais de software se aplicam a pipelines de dados usando o Dataflow. Ao continuar a leitura desta seção, consulte o diagrama acima para entender como os diferentes tipos de teste estão relacionados.

Amostragem de dados

Para observar os dados em cada etapa de um pipeline do Dataflow, ative a amostragem de dados durante o teste. Com isso, é possível visualizar as saídas das transformações para garantir que estejam corretas.

Testes de unidade

Os testes de unidade avaliam o funcionamento correto de subclasses DoFn e de transformações compostas (subclasse PTransform) comparando a saída dessas transformações com um conjunto verificado de entradas e saídas de dados. Normalmente, os desenvolvedores podem executar esses testes no ambiente local. Eles também podem ser executados automaticamente na automação de teste de unidade usando integração contínua (CI) no ambiente de criação.

O Direct Runner executa testes de unidade com um subconjunto menor de dados de teste de referência que se concentram em testar a lógica de negócios das transformações. Os dados de teste precisam ser pequenos o suficiente para caber na memória local da máquina que executa o teste.

O SDK do Apache Beam fornece uma regra JUnit chamada TestPipeline para transformações individuais de teste de unidade (subclasses DoFn), transformações compostas (subclasses PTransform) e pipelines inteiros. É possível usar TestPipeline em um executor de pipeline do Apache Beam, como o Direct Runner ou o Dataflow Runner, para aplicar declarações no conteúdo de objetos PCollection usando PAssert, conforme mostrado no snippet de código a seguir de uma classe de teste JUnit:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

Testes de unidade para transformações individuais

Ao fatorar o código em transformações reutilizáveis (por exemplo, como classes aninhadas de nível superior ou estáticas), é possível criar testes segmentados para diferentes partes do pipeline. Além dos benefícios do teste, as transformações reutilizáveis promovem a manutenção e a reutilização de códigos, encapsulando a lógica de negócios do pipeline naturalmente em partes de componentes. Por outro lado, testar partes individuais do pipeline pode ser difícil quando o pipeline usa classes internas anônimas para implementar transformações.

O snippet Java a seguir mostra a implementação de transformações como classes internas anônimas, o que não permite testes facilmente.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Compare o exemplo anterior com o seguinte, em que as classes internas anônimas foram refatoradas em subclasses DoFn concretas. É possível criar testes de unidade individuais para cada subclasse DoFn concreta que compõe o pipeline completo.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

O teste de cada subclasse DoFn é semelhante ao teste de unidade de um pipeline em lote que contém uma única transformação. Use a transformação Create para criar um objeto PCollection de dados de teste e transmita-o para o objeto DoFn. Use PAssert para declarar que o conteúdo do objeto PCollection está correto. O exemplo de código Java a seguir usa a classe PAssert para verificar o formato de saída correto.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Testes de integração

Os testes de integração verificam o funcionamento correto de todo o pipeline. Considere os seguintes tipos de teste de integração:

  • Um teste de integração de transformação que avalia a funcionalidade integrada de todas as transformações individuais que compõem o pipeline de dados. Pense nos testes de integração de transformação como um teste de unidade para todo o pipeline, exceto a integração com coletores e fontes de dados externas. O SDK do Beam fornece métodos para fornecer dados de teste ao pipeline de dados e para verificar os resultados do processamento. O Direct Runner é usado para executar testes de integração de transformação.
  • Um teste de integração do sistema que avalia a integração do pipeline de dados com coletores e fontes de dados em tempo real. Para que o pipeline se comunique com sistemas externos, você precisa configurar os testes com as credenciais apropriadas para acessar serviços externos. Os pipelines de streaming são executados indefinidamente. Portanto, você precisa decidir quando e como interromper o pipeline em execução. Ao usar o Direct Runner para executar testes de integração do sistema, você verifica rapidamente a integração entre o pipeline e outros sistemas sem precisar enviar um job do Dataflow e aguardar a conclusão dele.

Projete testes de transformação e integração de sistemas para fornecer detecção de defeitos e feedback rápidos sem atrasar a produtividade do desenvolvedor. Para testes de longa duração, como os executados como jobs do Dataflow, use um teste completo que seja executado com menos frequência.

Pense em um pipeline de dados como uma ou mais transformações relacionadas. Portanto, é possível criar uma transformação composta de encapsulamento para o pipeline e usar TestPipeline para realizar um teste de integração de todo o pipeline. De acordo com sua preferência quanto ao teste do pipeline no modo em lote ou em streaming, forneça dados de teste usando as transformações Create ou TestStream.

Usar dados de teste para testes de integração

No ambiente de produção, o pipeline provavelmente se integra a diferentes fontes de dados e coletores. No entanto, para testes de unidade e testes de integração de transformação, concentre-se na verificação da lógica de negócios do código do pipeline, fornecendo entradas de teste e verificando a saída diretamente. Além de simplificar os testes, essa abordagem permite isolar problemas específicos do pipeline daqueles que podem ser causados por fontes de dados e coletores.

Testar pipelines em lote

Para pipelines em lote, use a transformação Create para criar um objeto PCollection dos dados de teste de entrada com base em uma coleção padrão na memória, como um objeto Java List. Usar a transformação Create é apropriado quando os dados de teste são pequenos o suficiente para serem incluídos no código. Em seguida, use PAssert nos objetos PCollection de saída para determinar a exatidão do código do pipeline. Essa abordagem é compatível com o Direct Runner e com o Dataflow Runner.

O snippet de código Java a seguir mostra declarações sobre objetos PCollection de saída de uma transformação composta que inclui algumas ou todas as transformações individuais que compõem um pipeline (WeatherStatsPipeline). A abordagem é semelhante ao teste de unidade de transformações individuais em um pipeline.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

Para testar o comportamento de gestão de janelas, também é possível usar a transformação Create para criar elementos com carimbos de data/hora, conforme mostrado no snippet de código a seguir:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Testar pipelines de streaming

Os pipelines de streaming contêm suposições que definem como processar dados ilimitados. Essas suposições geralmente são sobre a pontualidade dos dados em condições reais e, portanto, têm impacto na correção, dependendo se as suposições são verdadeiras ou falsas. O ideal é que os testes de integração para pipelines de streaming incluam testes que simulem a natureza não determinista da chegada de dados de streaming.

Para ativar esses testes, o SDK do Apache Beam fornece a classe TestStream a fim de modelar os efeitos da velocidade do elemento (dados antecipados, pontuais ou atrasados) nos resultados do pipeline de dados. Use esses testes com a classe PAssert para verificar os resultados esperados.

TestStream é compatível com o Direct Runner e o Dataflow Runner. A amostra de código a seguir cria uma transformação TestStream:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

Para mais informações sobre TestStream, consulte Como testar pipelines ilimitados no Apache Beam. Para mais informações sobre como usar o SDK do Apache Beam para testes de unidade, consulte a documentação do Apache Beam.

Usar os serviços do Google Cloud em testes de integração

O Direct Runner pode ser integrado aos serviços do Google Cloud para que testes ad hoc no ambiente local e testes de integração do sistema usem o Pub/Sub, o BigQuery e outros serviços conforme necessário. Quando você usa o Direct Runner, o pipeline é executado como a conta de usuário configurada usando a ferramenta de linha de comando gcloud ou como uma conta de serviço que você especificou usando a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS. Portanto, é preciso conceder permissões suficientes a essa conta para todos os recursos necessários antes de executar o pipeline. Para mais informações, consulte Segurança e permissões do Dataflow.

Para testes de integração totalmente locais, é possível usar emuladores locais para alguns serviços do Google Cloud. Os emuladores locais estão disponíveis para Pub/Sub e Bigtable.

Para testes de integração do sistema de pipelines de streaming, use o método setBlockOnRun (definido na interface DirectOptions) para que o Direct Runner execute o pipeline de maneira assíncrona. Caso contrário, a execução do pipeline bloqueará o processo pai de chamada (por exemplo, um script no pipeline de build) até que o pipeline seja interrompido manualmente. Se você executar o pipeline de maneira assíncrona, use a instância PipelineResult retornada para cancelar a execução do pipeline, conforme mostrado no exemplo de código a seguir:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

Testes completos

Testes completos verificam a operação correta do pipeline completo executando-o no Dataflow Runner em condições semelhantes à produção. Os testes verificam se a lógica de negócios funciona corretamente usando o Dataflow Runner e se o pipeline tem o desempenho esperado em cargas semelhantes às de produção. Normalmente, os testes completos são executados em um projeto dedicado do Google Cloud que foi designado como o ambiente de pré-produção.

Para testar o pipeline em diferentes escalas, use diferentes tipos de testes completos, por exemplo:

  • Execute testes completos de pequena escala com uma pequena proporção (como 1%) do conjunto de dados de teste para validar rapidamente a funcionalidade do pipeline no ambiente de pré-produção.
  • Execute testes completos em um conjunto de dados de teste completo para validar a funcionalidade do pipeline em volumes e condições de dados semelhantes à de produção.

Para pipelines de streaming, recomendamos que você execute pipelines de teste em paralelo com o pipeline de produção se eles puderem usar os mesmos dados. Isso permite comparar os resultados e o comportamento operacional, como escalonamento automático e desempenho.

Os testes completos ajudam a prever como o pipeline atenderá aos SLOs de produção. O ambiente de pré-produção testa o pipeline em condições semelhantes à produção. Em testes completos, os pipelines são executados usando o Dataflow Runner para processar conjuntos de dados de referência completos que correspondem (ou são muito semelhantes a) conjuntos de dados na produção.

Talvez não seja possível gerar dados sintéticos para testes que simulem com precisão dados reais. Para resolver esse problema, uma abordagem é usar extrações limpas de fontes de dados de produção para criar conjuntos de dados de referência, em que todos os dados confidenciais são desidentificados por meio de transformações apropriadas. Recomendamos usar a Proteção de dados sensíveis para essa finalidade. A Proteção de dados sensíveis pode detectar dados sensíveis em vários tipos de conteúdo e fontes de dados e aplicar várias técnicas de desidentificação, incluindo encobrimento, mascaramento, criptografia de preservação de formato e mudança de data.

Diferenças em testes completos para pipelines em lote e de streaming

Antes de executar um teste completo em um grande conjunto de dados de teste, faça um teste com uma porcentagem menor desses dados, por exemplo, um por cento, e verifique o comportamento esperado em um tempo mais curto. Assim como nos testes de integração que usam o Direct Runner, é possível usar PAssert em objetos PCollection ao executar pipelines usando o Dataflow Runner. Para saber mais sobre PAssert, consulte a seção Testes de unidade nesta página.

Dependendo do caso de uso, a verificação de saídas muito grandes de testes completos pode ser inviável, cara ou complicada. Nesse caso, é possível verificar amostras representativas do conjunto de resultados de saída. Por exemplo, é possível usar o BigQuery para testar e comparar linhas de saída a um conjunto de dados de referência dos resultados esperados.

Para pipelines de streaming, a simulação de condições realistas de streaming com dados sintéticos pode complicada. Uma maneira comum de fornecer dados de streaming para testes completos é integrar testes a fontes de dados de produção. Se você estiver usando o Pub/Sub como fonte de dados, poderá ativar um fluxo de dados separado para testes completos usando assinaturas adicionais de tópicos existentes. Em seguida, é possível comparar os resultados de pipelines diferentes que consomem os mesmos dados, o que é útil para comparar pipelines candidatos a outros pipelines de pré-produção e produção.

O diagrama a seguir mostra como esse método permite que um pipeline de produção e um pipeline de teste sejam executados em paralelo em diferentes ambientes de implantação.

Executar um pipeline de teste em paralelo a um pipeline de produção usando uma única origem de streaming do Pub/Sub.

No diagrama, os dois pipelines leem o mesmo tópico do Pub/Sub, mas usam assinaturas separadas. Isso permite que os dois pipelines processem os mesmos dados de maneira independente e permite comparar os resultados. O pipeline de teste usa uma conta de serviço separada do projeto de produção e, portanto, evita o uso da cota de assinante do Pub/Sub no projeto de produção.

Ao contrário dos pipelines em lote, os pipelines de streaming continuam em execução até serem cancelados explicitamente. Nos testes completos, você precisa decidir se quer deixar o pipeline em execução, talvez até a execução do próximo teste completo, ou cancelar o pipeline em um momento que represente a conclusão do teste. Assim, é possível inspecionar os resultados.

O tipo de dados de teste que você usa influencia essa decisão. Por exemplo, se você usar um conjunto vinculado de dados de teste fornecido ao pipeline de streaming, poderá cancelar o pipeline quando todos os elementos tiverem concluído o processamento. Como alternativa, se você usar uma fonte de dados real (como um tópico do Pub/Sub usado na produção) ou gerar dados de teste continuamente, é preciso manter os pipelines de teste em execução por um período maior. A última opção permite comparar o comportamento com o ambiente de produção ou até mesmo com outros pipelines de teste.