Nesta página, apresentamos as práticas recomendadas para desenvolver e testar seu pipeline do Dataflow.
Visão geral
A forma como o código do pipeline é implementado tem uma influência significativa no desempenho do pipeline na produção. Para ajudar você a criar um código de pipeline que funcione corretamente e com eficiência, este documento explica os seguintes itens:
- Executores de pipeline que oferecem suporte à execução de código nos diferentes estágios de desenvolvimento e implantação.
- Ambientes de implantação que permitem executar pipelines durante o desenvolvimento, teste, pré-produção e produção.
- Código de pipeline de código aberto e modelos prontos para uso ou que podem ser usados como base de novos pipelines para acelerar o desenvolvimento de código.
- Práticas recomendadas de codificação para desenvolver o pipeline a fim de melhorar a observabilidade e o desempenho do pipeline. Muitas dessas práticas são aplicáveis à programação usando o SDK do Apache Beam (os exemplos se concentram no Java) e não são específicas do Dataflow. No entanto, em muitos casos, o Dataflow fornece recursos que complementam essas práticas de programação para melhorar a prontidão da produção.
- Uma abordagem de práticas recomendadas para testar o código do pipeline. Primeiro, este documento fornece uma visão geral que inclui o escopo e o relacionamento de diferentes tipos de teste, como testes de unidade, testes de integração e testes completos. Em segundo lugar, cada tipo de teste é explorado em detalhes, incluindo métodos para criar e integrar com dados de teste e quais executores de pipeline usar para cada teste.
Executores de pipelines
Durante o desenvolvimento e os testes, você usa diferentes executores do Apache Beam para gerar o código do pipeline. O SDK do Apache Beam fornece o Direct Runner, um executor direto para desenvolvimento e teste locais. Suas ferramentas de automação de lançamento também podem usar o Direct Runner para testes de unidade e integração. Por exemplo, é possível usar o Direct Runner no pipeline de integração contínua (CI).
Os pipelines implantados no Dataflow usam o Dataflow Runner, que executa o pipeline em ambientes semelhantes a ambientes de produção. Além disso, é possível usar o Dataflow Runner para testes de desenvolvimento ad hoc e testes de pipeline completos.
Embora esta página se concentre na execução de pipelines criados com o SDK do Apache Beam para Java, o Dataflow também oferece suporte aos pipelines do Apache Beam que foram desenvolvidos usando Python e Go. Os SDKs do Apache Beam para Java, Python e Go estão com disponibilidade geral no Dataflow. Os desenvolvedores de SQL também podem usar a SQL do Apache Beam para criar pipelines que usam dialetos de SQL conhecidos.
Configurar um ambiente de implantação
Para separar usuários, dados, códigos e outros recursos em diferentes estágios de desenvolvimento, crie ambientes de implantação. Sempre que possível, para fornecer ambientes isolados nos diferentes estágios de desenvolvimento do pipeline, use projetos do Google Cloud separados.
As seções a seguir descrevem um conjunto típico de ambientes de implantação.
Ambiente local
O ambiente local é a estação de trabalho de um desenvolvedor. Para desenvolvimento e testes rápidos, use o Direct Runner para executar o código do pipeline localmente.
Os pipelines executados localmente usando o Direct Runner podem interagir com recursos remotos do Google Cloud, como tópicos do Pub/Sub ou tabelas do BigQuery. Dê aos desenvolvedores individuais projetos separados do Google Cloud para que eles tenham um sandbox para testes ad hoc com os serviços do Google Cloud.
Alguns serviços do Google Cloud, como o Pub/Sub e o Bigtable, fornecem emuladores para desenvolvimento local. É possível usar esses emuladores com o Direct Runner para ativar o desenvolvimento e o teste locais completos.
Ambiente de sandbox
O ambiente de sandbox é um projeto do Google Cloud que fornece aos desenvolvedores acesso aos serviços do Google Cloud durante o desenvolvimento do código. Os desenvolvedores de pipeline podem compartilhar um projeto do Google Cloud com outros desenvolvedores ou usar os próprios projetos individuais. O uso de projetos individuais reduz a complexidade do planejamento em relação ao uso de recursos compartilhados e ao gerenciamento de cotas.
Os desenvolvedores usam o ambiente de sandbox para a execução ad hoc do pipeline usando o Dataflow Runner. O ambiente sandbox é útil para depurar e testar códigos em um executor de produção durante a fase de desenvolvimento de código. Por exemplo, a execução ad-hoc do pipeline permite que os desenvolvedores:
- observem o efeito das alterações do código no comportamento de escalonamento;
- entendam as possíveis diferenças entre o comportamento do Direct Runner e do Dataflow Runner;
- entendam como o Dataflow aplica as otimizações de gráfico.
Em testes ad hoc, os desenvolvedores podem implantar códigos do ambiente local para executar o Dataflow no ambiente de sandbox.
Ambiente de pré-produção
O ambiente de pré-produção serve para as fases de desenvolvimento que precisam ser executadas em condições de produção, como testes completos. Use um projeto separado para o ambiente de pré-produção e o configure para ser o mais semelhante possível ao de produção. Da mesma forma, para permitir testes completos em escala de produção, torne as cotas de projetos do Google Cloud para o Dataflow e outros serviços o mais semelhante possível ao ambiente de produção.
Dependendo dos requisitos, é possível separar ainda mais a pré-produção em vários ambientes. Por exemplo, um ambiente de controle de qualidade pode dar suporte ao trabalho de analistas de qualidade para testar objetivos de nível de serviço (SLOs), como precisão de dados, atualização e desempenho em diferentes condições de carga de trabalho.
Os testes completos incluem integração com fontes de dados e coletores dentro do escopo dos testes. Considere como disponibilizá-los no ambiente de pré-produção. É possível armazenar dados de teste no próprio ambiente de pré-produção. Por exemplo, os dados de teste são armazenados em um bucket do Cloud Storage com seus dados de entrada. Em outros casos, os dados de teste podem ser originados fora do ambiente de pré-produção, como um tópico do Pub/Sub por meio de uma assinatura separada que está no ambiente de produção. Para pipelines de streaming, também é possível executar testes completos usando dados gerados, por exemplo, com o Gerador de dados de streaming do Dataflow, para emular características e volumes de dados semelhantes à produção.
Para pipelines de streaming, use o ambiente de pré-produção para testar atualizações do pipeline antes que qualquer alteração seja feita na produção. É importante testar e verificar os procedimentos de atualização para pipelines de streaming, especialmente se você precisar coordenar várias etapas, como ao executar pipelines paralelos para evitar inatividade.
Ambiente de produção
O ambiente de produção é um projeto dedicado do Google Cloud. A entrega contínua copia os artefatos de implantação para o ambiente de produção quando todos os testes completos são aprovados.
Práticas recomendadas de desenvolvimento
Esta seção aborda as práticas recomendadas de codificação e desenvolvimento. Muitas dessas práticas complementam e melhoram aspectos do desenvolvimento e da operação do pipeline, como melhorar a produtividade do desenvolvedor, promover a testabilidade do pipeline, aumentar o desempenho e permitir insights mais detalhados com monitoramento.
Antes de iniciar o desenvolvimento, configure ambientes de implantação que ofereçam suporte ao seu ciclo de vida de desenvolvimento, teste e entrega.
Usar modelos fornecidos pelo Google
Para acelerar o desenvolvimento do pipeline, verifique se o Google fornece um modelo do Dataflow. Alguns modelos permitem adicionar lógica personalizada como uma etapa do pipeline. Por exemplo, a assinatura do Pub/Sub para o modelo do BigQuery fornece um parâmetro para executar uma função JavaScript definida pelo usuário (UDF, na sigla em inglês) que é armazenada no Cloud Storage. Os modelos fornecidos pelo Google são de código aberto sob a licença Apache 2.0, para que você possa usá-los como base para novos pipelines. Os modelos também são úteis como exemplos de código para referência.
Criar bibliotecas de transformações reutilizáveis
O modelo de programação do Apache Beam unifica o processamento de dados em lote e streaming, o que possibilita a reutilização de transformações. Criar uma biblioteca compartilhada de transformações comuns promove a reutilização, a capacidade de teste e a propriedade do código por diferentes equipes.
Considere os dois exemplos de código Java a seguir, que leem eventos de pagamento. O primeiro é de uma fonte ilimitada do Pub/Sub:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
O segundo é proveniente de uma fonte limitada de banco de dados relacional:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
Supondo que ambos os pipelines executem o mesmo processamento, eles podem usar as mesmas transformações de uma biblioteca compartilhada para as etapas de processamento restantes. A implementação das práticas recomendadas de reutilização de código varia de acordo com a linguagem de programação e a ferramenta de build. Por exemplo, se você usa o Maven, é possível separar o código de transformação no próprio módulo. Em seguida, você pode incluir o módulo como um submódulo em projetos com vários módulos maiores para pipelines diferentes, conforme mostrado no exemplo de código a seguir:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Para mais informações, consulte a documentação do Apache Beam para conferir práticas recomendadas na gravação de códigos de usuário para transformações do Apache Beam e o guia de estilo recomendado para PTransforms.
Usar filas de mensagens inativas para tratamento de erros
O pipeline pode encontrar situações em que não é possível processar elementos. Isso pode ocorrer por diferentes motivos, mas uma causa comum são problemas de dados. Por exemplo, um elemento que contém JSON mal formatado pode causar falhas de análise.
Nessa situação, uma abordagem é capturar uma exceção no método
DoFn.ProcessElement
. No bloco de exceções, registre o erro e remova o elemento.
No entanto, isso faz com que os dados sejam perdidos e impede que eles sejam
inspecionados posteriormente para processamento manual ou solução de problemas.
Uma abordagem melhor é usar um padrão chamado fila de mensagens inativas (ou
arquivo de mensagens inativas). Você captura exceções no método DoFn.ProcessElement
e registra
erros como faria normalmente. Em vez de remover o elemento com falha,
use saídas de ramificação para gravá-lo em um objeto PCollection
separado. Esses elementos podem ser gravados em um coletor de dados para inspeção
e processamento posteriores usando uma transformação separada.
O exemplo de código Java a seguir mostra como implementar o padrão de fila de mensagens inativas:
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
É possível usar o Cloud Monitoring para aplicar diferentes políticas de monitoramento e alerta na fila de mensagens inativas do pipeline. Por exemplo, é possível visualizar o número e o tamanho dos elementos processados pela transformação de mensagens inativas e configurar os alertas para serem acionados se determinadas condições de limite forem atendidas.
Gerenciar mutações de esquema
É possível processar dados que tenham esquemas inesperados (mas válidos) usando um padrão de mensagens inativas,
que grava elementos com falha em um objeto PCollection
separado.
No entanto, em alguns casos, é melhor processar automaticamente os elementos
que refletem um esquema alterado como elementos válidos. Por exemplo, se o esquema
de um elemento refletir uma mutação, como a adição de novos campos, será possível adaptar o
esquema do coletor de dados para acomodar mutações.
A mutação automática de esquema depende da abordagem de ramificação e da saída usada pelo padrão de mensagens inativas. No entanto, nesse caso, ele aciona uma transformação que modifica o esquema de destino sempre que esquemas aditivos forem encontrados. Para ver um exemplo dessa abordagem, consulte Como processar a modificação de esquemas JSON em um pipeline de streaming, com a Square Enix no blog do Google Cloud.
Escolha corretamente entre entradas secundárias ou CoGroupByKey
para junções
A junção de conjuntos de dados é um caso de uso comum para pipelines de dados.
As entradas secundárias
fornecem uma maneira flexível de resolver problemas comuns de processamento de dados, como
enriquecimento de dados e pesquisas com chaves. Ao contrário dos objetos PCollection
, as entradas secundárias também
são mutáveis e podem ser determinadas no ambiente de execução. Por exemplo, os valores em uma
entrada secundária podem ser calculados por outra ramificação no pipeline ou determinados
chamando um serviço remoto.
O Dataflow é compatível com entradas secundárias usando a persistência de dados no armazenamento permanente (semelhante a um disco compartilhado), o que disponibiliza a entrada secundária completa para todos os workers. Os tamanhos de entrada secundária podem ser muito grandes e não caberem na memória do worker. A leitura de uma entrada secundária grande pode causar problemas de desempenho se os workers precisarem ler constantemente do armazenamento permanente.
A transformação
CoGroupByKey
é uma
transformação principal do Apache Beam
que combina (nivela) vários objetos PCollection
e elementos de grupos que
tenham uma chave comum. Diferente de uma entrada secundária, que disponibiliza todos os dados de entrada secundária
para cada worker, a CoGroupByKey
executa uma operação de embaralhamento (agrupamento)
para distribuir dados entre os workers. Portanto, CoGroupByKey
é ideal quando os objetos
PCollection
que você quer mesclar são muito grandes e não se encaixam na memória
do worker.
Siga estas diretrizes para decidir se quer usar entradas secundárias ou
CoGroupByKey
:
- Use entradas secundárias quando um dos objetos
PCollection
que você está mesclando for desproporcionalmente menor que o outro e o objetoPCollection
menor se encaixar na memória do worker. Armazenar toda a entrada secundária em cache na memória torna a busca de elementos rápida e eficiente. - Use
CoGroupByKey
se você precisar buscar uma grande proporção de um objetoPCollection
que exceda significativamente a memória do worker. - Use entradas secundárias quando tiver um objeto
PCollection
que deva ser unido várias vezes no pipeline. Em vez de usar várias transformaçõesCoGroupByKey
, é possível criar uma única entrada secundária que possa ser reutilizada por várias transformaçõesParDo
.
Para mais informações, consulte Resolver problemas de falta de memória no Dataflow.
Minimizar operações caras por elemento
Uma instância DoFn
processa lotes de elementos chamados
pacotes
(unidades atômicas de trabalho que consistem em zero ou mais
elementos). Os elementos individuais são processados pelo método
DoFn.ProcessElement
,
que é executado para cada elemento. Como o método
DoFn.ProcessElement
é chamado para cada elemento, qualquer operação demorada ou computacionalmente
cara invocada por ele faz com que essas operações sejam
executadas para cada elemento que ele processa.
Se você precisar realizar operações caras apenas uma vez para um lote de elementos,
inclua essas operações nos métodos
DoFn.Setup
e
DoFn.StartBundle
em vez de em DoFn.ProcessElement
. Os exemplos incluem:
Análise de um arquivo de configuração que controla algum aspecto do comportamento da instância
DoFn
. Invoque essa ação apenas uma vez, quando a instânciaDoFn
for inicializada usando o métodoDoFn.Setup
.Ao instanciar um cliente de curta duração que é reutilizado em todos os elementos de um pacote, por exemplo, quando todos os elementos do pacote são enviados por uma única conexão de rede. Invoque essa ação uma vez por pacote usando o método
DoFn.StartBundle
.
Limitar os tamanhos de lotes e as chamadas simultâneas a serviços externos
Ao chamar serviços externos, é possível reduzir as despesas por chamada usando a transformação
GroupIntoBatches
para criar lotes de elementos de um tamanho especificado. O envio em lote envia
elementos para um serviço externo como um payload em vez de
individualmente.
Em combinação com lotes, é possível limitar o número máximo de chamadas paralelas (simultâneas) ao serviço externo escolhendo as chaves apropriadas para particionar os dados recebidos. O número de partições determina o carregamento em paralelo máximo. Por exemplo, se cada elemento receber a mesma chave, uma transformação downstream para chamar o serviço externo não será executada em paralelo.
Considere uma das seguintes abordagens para produzir chaves para elementos:
- Escolha um atributo do conjunto de dados para usar como chaves de dados, como IDs de usuário.
- Gere chaves de dados para dividir elementos de maneira aleatória em um número fixo de
partições, em que o número possível de chaves-valor determina o número
de partições. Você precisa criar partições suficientes para o paralelismo, e
cada partição precisa ter elementos suficientes para que
GroupIntoBatches
seja útil.
No exemplo de código Java a seguir, mostramos como dividir aleatoriamente elementos em 10 partições:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identificar problemas de desempenho causados por etapas com a combinação inadequada
O Dataflow cria um gráfico de etapas que representa o pipeline, com base nas transformações e nos dados usados para criá-lo. Isso é chamado de gráfico de execução de pipeline.
Quando você implanta o pipeline, o Dataflow pode modificar
o gráfico de execução do pipeline para melhorar o desempenho. Por exemplo, o Dataflow
pode unir algumas operações, um processo conhecido como
otimização de fusão,
para evitar o impacto de desempenho e custo de cada gravação de objeto intermediário
PCollection
no pipeline.
Em alguns casos, o Dataflow pode determinar incorretamente a melhor forma de fundir operações no pipeline, o que pode limitar a capacidade do serviço do Dataflow de usar todos os workers disponíveis. Nesses casos, é possível evitar que algumas operações sejam mescladas.
Considere o exemplo de código do Apache Beam a seguir. Uma transformação
GenerateSequence
cria um pequeno objeto PCollection
limitado, que depois será processado
por duas transformações ParDo
downstream.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
A transformação Find Primes Less-than-N
pode ser cara em termos de computação e provavelmente
será executada lentamente para números grandes. Por outro lado, espera-se que a transformação
Increment Number
seja concluída rapidamente.
O diagrama a seguir mostra uma representação gráfica do pipeline na interface de monitoramento do Dataflow.
O monitoramento do job usando a
interface de monitoramento do Dataflow
mostra a mesma taxa de processamento lenta para as duas transformações, ou seja, 13 elementos
por segundo. Espera-se que a transformação Increment Number
processe
elementos rapidamente, mas parece que ela está vinculada à mesma taxa de
processamento que Find Primes Less-than-N
.
O motivo é que o Dataflow mesclava as etapas em uma única
fase, o que as impedia de serem executadas de maneira independente. É possível usar o
seguinte comando gcloud
:
gcloud dataflow jobs describe --full job-id --format json
Na saída resultante, as etapas combinadas são descritas no objeto
ExecutionStageSummary
na matriz
ComponentTransform
:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
Nesse cenário, a transformação Find Primes Less-than-N
é a etapa lenta. Portanto,
a quebra da mesclagem antes dessa etapa é uma estratégia apropriada. Um método para
separar as etapas é inserir uma transformação
GroupByKey
e realizar o desagrupamento antes da etapa, conforme mostrado no exemplo de código Java
a seguir:
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Também é possível combinar essas etapas em uma transformação composta reutilizável.
Depois de separar a etapa, ao executar o pipeline, você verá que Increment Number
será concluído em segundos e que a transformação Find Primes Less-than-N
,
muito mais longa, será executada em uma etapa separada.
Este exemplo aplica uma operação de agrupamento e desagrupamento para propagar as etapas.
É possível usar outras abordagens para outras circunstâncias. Nesse caso, o processamento
da saída duplicada não é um problema, dada a saída consecutiva da
transformação
GenerateSequence
.
Objetos KV
com chaves duplicadas são têm a duplicação eliminada para uma única chave nas
transformações de agrupamento
(GroupByKey
)
e de separação
(Keys
). Para reter cópias após as operações de agrupamento e desagrupamento,
é possível criar pares KV
usando uma chave aleatória e a entrada original como o valor,
realizar o agrupamento usando a chave aleatória e emitir os valores de cada chave como
saída.
Usar métricas do Apache Beam para coletar insights do pipeline
As métricas do Apache Beam são uma classe de utilitários que produzem várias métricas e geram relatórios das propriedades de um pipeline em execução. Quando você usa o Cloud Monitoring, as métricas do Apache Beam ficam disponíveis como métricas personalizadas do Cloud Monitoring.
O snippet Java a seguir é um exemplo de métricas
Counter
usadas em uma subclasse DoFn
.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
O código de exemplo usa dois contadores. Um contador rastreia falhas de análise JSON
(malformedCounter
) e o outro monitora se a mensagem JSON é
válida, mas contém um payload vazio (emptyCounter
). No Cloud Monitoring,
os nomes das métricas personalizadas são custom.googleapis.com/dataflow/malformedJson
e
custom.googleapis.com/dataflow/emptyPayload
. Use as métricas personalizadas
para criar visualizações e políticas de alertas no Cloud Monitoring.
Testar seu pipeline
No desenvolvimento de software, testes de unidade, testes de integração e testes completos são tipos comuns de teste de software. Esses tipos de teste também são aplicáveis a pipelines de dados.
O SDK do Apache Beam fornece funcionalidades para ativar esses testes. O ideal é que cada tipo de teste seja direcionado a um ambiente de implantação diferente. O diagrama a seguir ilustra como os testes de unidade, os testes de integração e os testes completos se aplicam a diferentes partes do pipeline e dos dados.
O diagrama mostra o escopo de diferentes testes e como eles se relacionam com
transformações (subclasses DoFn
e PTransform
), pipelines, fontes de dados e
coletores de dados.
As seções a seguir descrevem como vários testes formais de software se aplicam a pipelines de dados usando o Dataflow. Ao continuar a leitura desta seção, consulte o diagrama acima para entender como os diferentes tipos de teste estão relacionados.
Amostragem de dados
Para observar os dados em cada etapa de um pipeline do Dataflow, ative a amostragem de dados durante o teste. Isso permite visualizar as saídas das transformações para garantir que estejam corretas.
Testes de unidade
Os testes de unidade avaliam o funcionamento correto de subclasses DoFn
e de
transformações compostas
(subclasse PTransform
) comparando a saída dessas transformações com um
conjunto verificado de entradas e saídas de dados. Normalmente, os desenvolvedores podem executar esses
testes no ambiente local. Eles também podem ser executados automaticamente na
automação de teste de unidade usando integração contínua (CI) no
ambiente de criação.
O Direct Runner executa testes de unidade com um subconjunto menor de dados de teste de referência que se concentram em testar a lógica de negócios das transformações. Os dados de teste precisam ser pequenos o suficiente para caber na memória local da máquina que executa o teste.
O SDK do Apache Beam fornece uma regra JUnit chamada
TestPipeline
para transformações individuais de teste de unidade (subclasses DoFn
), transformações compostas
(subclasses PTransform
) e pipelines inteiros. É possível usar TestPipeline
em um
executor de pipeline do Apache Beam, como o Direct Runner ou o
Dataflow Runner, para aplicar declarações no conteúdo de
objetos PCollection
usando
PAssert
,
conforme mostrado no snippet de código a seguir de uma
classe de teste JUnit:
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
final PCollection<String> pcol = p.apply(...)
PAssert.that(pcol).containsInAnyOrder(...);
p.run();
}
Testes de unidade para transformações individuais
Ao fatorar o código em transformações reutilizáveis (por exemplo, como classes aninhadas de nível superior ou estáticas), é possível criar testes segmentados para diferentes partes do pipeline. Além dos benefícios do teste, as transformações reutilizáveis promovem a manutenção e a reutilização de códigos, encapsulando a lógica de negócios do pipeline naturalmente em partes de componentes. Por outro lado, testar partes individuais do pipeline pode ser difícil quando o pipeline usa classes internas anônimas para implementar transformações.
O snippet Java a seguir mostra a implementação de transformações como classes internas anônimas, o que não permite testes facilmente.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
PCollection<Integer> output =
p.apply("Read from text", TextIO.Read.from(...))
.apply("Split words", ParDo.of(new DoFn() {
// Untestable anonymous transform 1
}))
.apply("Generate anagrams", ParDo.of(new DoFn() {
// Untestable anonymous transform 2
}))
.apply("Count words", Count.perElement());
Compare o exemplo anterior com o seguinte, em que as classes internas
anônimas foram refatoradas em subclasses DoFn
concretas. É possível criar
testes de unidade individuais para cada subclasse DoFn
concreta que compõe o
pipeline completo.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
PCollection<Integer> output =
p.apply("Read from text", TextIO.Read.from(...))
.apply("Split words", ParDo.of(new SplitIntoWordsFn()))
.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
.apply("Count words", Count.perElement());
O teste de cada subclasse DoFn
é semelhante ao teste de unidade de um pipeline
em lote que contém uma única transformação. Use a transformação Create
para
criar um objeto PCollection
de dados de teste e transmita-o para o objeto
DoFn
. Use PAssert
para declarar que o conteúdo do objeto PCollection
está correto. O exemplo de código Java a seguir usa a classe PAssert
para
verificar o formato de saída correto.
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
// Create the test input
PCollection<String> words = p.apply(Create.of("friend"));
// Test a single DoFn using the test input
PCollection<String> anagrams =
words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));
// Assert correct output from
PAssert.that(anagrams).containsInAnyOrder(
"finder", "friend", "redfin", "refind");
p.run();
}
Testes de integração
Os testes de integração verificam o funcionamento correto de todo o pipeline. Considere os seguintes tipos de teste de integração:
- Um teste de integração de transformação que avalia a funcionalidade integrada de todas as transformações individuais que compõem o pipeline de dados. Pense nos testes de integração de transformação como um teste de unidade para todo o pipeline, exceto a integração com coletores e fontes de dados externas. O SDK do Beam fornece métodos para fornecer dados de teste ao pipeline de dados e para verificar os resultados do processamento. O Direct Runner é usado para executar testes de integração de transformação.
Um teste de integração do sistema que avalia a integração do pipeline de dados com coletores e fontes de dados em tempo real. Para que o pipeline se comunique com sistemas externos, você precisa configurar os testes com as credenciais apropriadas para acessar serviços externos. Os pipelines de streaming são executados indefinidamente. Portanto, você precisa decidir quando e como interromper o pipeline em execução. Ao usar o Direct Runner para executar testes de integração do sistema, você verifica rapidamente a integração entre o pipeline e outros sistemas sem precisar enviar um job do Dataflow e aguardar a conclusão dele.
Projete testes de transformação e integração de sistemas para fornecer detecção de defeitos e feedback rápidos sem atrasar a produtividade do desenvolvedor. Para testes de longa duração, como os executados como jobs do Dataflow, use um teste completo que seja executado com menos frequência.
Pense em um pipeline de dados como uma ou mais transformações relacionadas. Portanto,
é possível criar uma transformação composta de encapsulamento para o pipeline e
usar
TestPipeline
para realizar um teste de integração de todo o pipeline. De acordo com sua preferência quanto ao
teste do pipeline no modo em lote ou em streaming, forneça dados de teste
usando as transformações
Create
ou
TestStream
.
Usar dados de teste para testes de integração
No ambiente de produção, o pipeline provavelmente se integra a diferentes fontes de dados e coletores. No entanto, para testes de unidade e testes de integração de transformação, concentre-se na verificação da lógica de negócios do código do pipeline, fornecendo entradas de teste e verificando a saída diretamente. Além de simplificar os testes, essa abordagem permite isolar problemas específicos do pipeline daqueles que podem ser causados por fontes de dados e coletores.
Testar pipelines em lote
Para pipelines em lote, use a transformação Create
para criar um objeto PCollection
dos dados de teste de entrada com base em uma coleção padrão na memória, como um objeto Java
List
. Usar a transformação Create
é apropriado quando os dados de
teste são pequenos o suficiente para serem incluídos no código. Em seguida, use PAssert
nos
objetos PCollection
de saída para determinar a exatidão do código do pipeline.
Essa abordagem é compatível com o Direct Runner e com o
Dataflow Runner.
O snippet de código Java a seguir mostra declarações sobre objetos PCollection
de saída de uma transformação composta que inclui algumas ou todas as transformações
individuais que compõem um pipeline (WeatherStatsPipeline
). A abordagem é
semelhante ao teste de unidade de transformações individuais em um pipeline.
private class WeatherStatsPipeline extends
PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
@Override
public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
// Pipeline transforms …
}
}
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
// Create test input consisting of temperature readings
PCollection<Integer> tempCelsius =
p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));
// CalculateWeatherStats calculates the min, max, and average temperature
PCollection<WeatherSummary> result =
tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());
// Assert correct output from CalculateWeatherStats
PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
.withAverageTemp(21)
.withMaxTemp(24)
.withMinTemp(20)
.build());
p.run();
}
Para testar o comportamento de gestão de janelas, também é possível usar a transformação Create
para criar
elementos com carimbos de data/hora, conforme mostrado no snippet de código a seguir:
private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
PCollection<String> input =
p.apply(
Create.timestamped(
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("b", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
.withCoder(StringUtf8Coder.of()));
PCollection<KV<String, Long>> windowedCount =
input
.apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
.apply(Count.perElement());
PAssert.that(windowedCount)
.containsInAnyOrder(
// Output from first window
KV.of("a", 2L),
KV.of("b", 1L),
KV.of("c", 1L),
// Output from second window
KV.of("c", 1L));
p.run();
}
Testar pipelines de streaming
Os pipelines de streaming contêm suposições que definem como processar dados ilimitados. Essas suposições geralmente são sobre a pontualidade dos dados em condições reais e, portanto, têm impacto na correção, dependendo se as suposições são verdadeiras ou falsas. O ideal é que os testes de integração para pipelines de streaming incluam testes que simulem a natureza não determinista da chegada de dados de streaming.
Para
ativar esses testes,
o SDK do Apache Beam fornece a classe
TestStream
a fim de modelar os efeitos da velocidade do elemento (dados antecipados, pontuais ou atrasados) nos
resultados do pipeline de dados. Use esses testes com a classe
PAssert
para verificar os resultados esperados.
TestStream
é compatível com o Direct Runner e o
Dataflow Runner. A amostra de código a seguir cria uma
transformação TestStream
:
final Duration WINDOW_DURATION = Duration.standardMinutes(3);
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
TestStream<String> input = TestStream.create(StringUtf8Coder.of())
// Add elements arriving before the watermark
.addElements(
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("b", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
// Advance the watermark past the end of the window
.advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
// Add elements which will be dropped due to lateness
.addElements(
TimestampedValue.of("c", new Instant(0L)))
// Advance the watermark to infinity which will close all windows
.advanceWatermarkToInfinity();
PCollection<KV<String, Long>> windowedCount =
p.apply(input)
.apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
.apply(Count.perElement());
PAssert.that(windowedCount)
.containsInAnyOrder(
// Output from first window
KV.of("a", 2L),
KV.of("b", 1L),
KV.of("c", 1L));
p.run();
}
Para mais informações sobre TestStream
, consulte
Como testar pipelines ilimitados no Apache Beam.
Para mais informações sobre como usar o SDK do Apache Beam para testes de unidade, consulte a
documentação do Apache Beam.
Usar os serviços do Google Cloud em testes de integração
O
Direct Runner
pode ser integrado aos serviços do Google Cloud para que testes ad hoc no ambiente local e testes de integração do
sistema usem o Pub/Sub, o BigQuery e outros serviços
conforme necessário. Quando você usa o Direct Runner, o pipeline é executado como a conta de usuário
configurada usando a ferramenta de linha de comando
gcloud
ou como uma conta de serviço que você especificou usando a
variável de ambiente
GOOGLE_APPLICATION_CREDENTIALS
. Portanto, é preciso conceder permissões
suficientes a essa conta para todos os recursos necessários antes de executar o
pipeline. Para mais informações, consulte
Segurança
e permissões do Dataflow.
Para testes de integração totalmente locais, é possível usar emuladores locais para alguns serviços do Google Cloud. Os emuladores locais estão disponíveis para Pub/Sub e Bigtable.
Para testes de integração do sistema de pipelines de streaming, use o método
setBlockOnRun
(definido na interface DirectOptions
) para que o Direct Runner execute o pipeline de maneira assíncrona.
Caso contrário, a execução do pipeline bloqueará o processo pai de chamada (por
exemplo, um script no pipeline de build) até que o pipeline seja interrompido
manualmente. Se você executar o pipeline de maneira assíncrona, use a instância
PipelineResult
retornada para cancelar a execução do pipeline, conforme mostrado no exemplo de código
a seguir:
public interface StreamingIntegrationTestOptions extends
DirectOptions, StreamingOptions, MyOtherPipelineOptions {
...
}
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
StreamingIntegrationTestOptions options =
p.getOptions().as(StreamingIntegrationOptions.class);
options.setBlockOnRun(false); // Set non-blocking pipeline execution
options.setStreaming(true); // Set streaming mode
p.apply(...); // Apply pipeline transformations
PipelineResult result = p.run(); // Run the pipeline
// Generate input, verify output, etc
...
// Later on, cancel the pipeline using the previously returned
result.cancel();
}
Testes completos
Testes completos verificam a operação correta do pipeline completo executando-o no Dataflow Runner em condições semelhantes à produção. Os testes verificam se a lógica de negócios funciona corretamente usando o Dataflow Runner e se o pipeline tem o desempenho esperado em cargas semelhantes às de produção. Normalmente, os testes completos são executados em um projeto dedicado do Google Cloud que foi designado como o ambiente de pré-produção.
Para testar o pipeline em diferentes escalas, use diferentes tipos de testes completos, por exemplo:
- Execute testes completos de pequena escala com uma pequena proporção (como 1%) do conjunto de dados de teste para validar rapidamente a funcionalidade do pipeline no ambiente de pré-produção.
- Execute testes completos em um conjunto de dados de teste completo para validar a funcionalidade do pipeline em volumes e condições de dados semelhantes à de produção.
Para pipelines de streaming, recomendamos que você execute pipelines de teste em paralelo com o pipeline de produção se eles puderem usar os mesmos dados. Isso permite comparar os resultados e o comportamento operacional, como escalonamento automático e desempenho.
Os testes completos ajudam a prever como o pipeline atenderá aos SLOs de produção. O ambiente de pré-produção testa o pipeline em condições semelhantes à produção. Em testes completos, os pipelines são executados usando o Dataflow Runner para processar conjuntos de dados de referência completos que correspondem (ou são muito semelhantes a) conjuntos de dados na produção.
Talvez não seja possível gerar dados sintéticos para testes que simulem com precisão dados reais. Para resolver esse problema, uma abordagem é usar extrações limpas de fontes de dados de produção para criar conjuntos de dados de referência, em que todos os dados confidenciais são desidentificados por meio de transformações apropriadas. Recomendamos usar a Proteção de dados sensíveis para essa finalidade. A Proteção de dados sensíveis pode detectar dados sensíveis em vários tipos de conteúdo e fontes de dados e aplicar várias técnicas de desidentificação, incluindo encobrimento, mascaramento, criptografia de preservação de formato e mudança de data.
Diferenças em testes completos para pipelines em lote e de streaming
Antes de executar um teste completo em um grande conjunto de dados de teste,
faça um teste com uma porcentagem menor desses dados, por exemplo,
um por cento, e verifique o comportamento esperado em um tempo mais curto. Assim
como nos testes de integração que usam o Direct Runner, é possível usar PAssert
em objetos
PCollection
ao executar pipelines usando o
Dataflow Runner. Para saber mais sobre PAssert
, consulte a seção
Testes de unidade nesta página.
Dependendo do caso de uso, a verificação de saídas muito grandes de testes completos pode ser inviável, cara ou complicada. Nesse caso, é possível verificar amostras representativas do conjunto de resultados de saída. Por exemplo, é possível usar o BigQuery para testar e comparar linhas de saída a um conjunto de dados de referência dos resultados esperados.
Para pipelines de streaming, a simulação de condições realistas de streaming com dados sintéticos pode complicada. Uma maneira comum de fornecer dados de streaming para testes completos é integrar testes a fontes de dados de produção. Se você estiver usando o Pub/Sub como fonte de dados, poderá ativar um fluxo de dados separado para testes completos usando assinaturas adicionais de tópicos existentes. Em seguida, é possível comparar os resultados de pipelines diferentes que consomem os mesmos dados, o que é útil para comparar pipelines candidatos a outros pipelines de pré-produção e produção.
O diagrama a seguir mostra como esse método permite que um pipeline de produção e um pipeline de teste sejam executados em paralelo em diferentes ambientes de implantação.
No diagrama, os dois pipelines leem o mesmo tópico do Pub/Sub, mas usam assinaturas separadas. Isso permite que os dois pipelines processem os mesmos dados de maneira independente e permite comparar os resultados. O pipeline de teste usa uma conta de serviço separada do projeto de produção e, portanto, evita o uso da cota de assinante do Pub/Sub no projeto de produção.
Ao contrário dos pipelines em lote, os pipelines de streaming continuam em execução até serem cancelados explicitamente. Nos testes completos, você precisa decidir se quer deixar o pipeline em execução, talvez até a execução do próximo teste completo, ou cancelar o pipeline em um momento que represente a conclusão do teste. Assim, é possível inspecionar os resultados.
O tipo de dados de teste que você usa influencia essa decisão. Por exemplo, se você usar um conjunto vinculado de dados de teste fornecido ao pipeline de streaming, poderá cancelar o pipeline quando todos os elementos tiverem concluído o processamento. Como alternativa, se você usar uma fonte de dados real (como um tópico do Pub/Sub usado na produção) ou gerar dados de teste continuamente, é preciso manter os pipelines de teste em execução por um período maior. A última opção permite comparar o comportamento com o ambiente de produção ou até mesmo com outros pipelines de teste.