Nesta página, descrevemos as práticas recomendadas para desenvolver pipelines do Dataflow. O uso dessas práticas recomendadas tem os seguintes benefícios:
- Melhore a observabilidade e o desempenho do pipeline
- Melhora na produtividade dos desenvolvedores
- Aprimorar a capacidade de teste de pipelines
Os exemplos de código do Apache Beam nesta página usam Java, mas o conteúdo se aplica aos SDKs do Apache Beam Java, Python e Go.
Perguntas a serem consideradas
Ao projetar o pipeline, considere as seguintes perguntas:
- Onde os dados de entrada do pipeline são armazenados? Quantos conjuntos de dados de entrada existem?
- Como são seus dados?
- O que você quer fazer com os dados?
- Para onde devem ir os dados de saída do pipeline?
- Seu job do Dataflow usa o Assured Workloads?
Usar modelos
Para acelerar o desenvolvimento do pipeline, em vez de criar um pipeline escrevendo um código do Apache Beam, use um modelo do Dataflow quando possível. Os modelos têm os seguintes benefícios:
- Os modelos são reutilizáveis.
- Os modelos permitem personalizar cada job mudando parâmetros de pipeline específicos.
- Assim, qualquer pessoa a quem você fornecer permissões poderá usar o modelo para implantar o pipeline. Por exemplo, um desenvolvedor pode criar um job de um modelo, e um cientista de dados na organização pode implantar esse modelo posteriormente.
É possível usar um modelo fornecido pelo Google ou criar seu próprio modelo. Alguns modelos fornecidos pelo Google permitem adicionar lógica personalizada como uma etapa do pipeline. Por exemplo, a assinatura do Pub/Sub para o modelo do BigQuery fornece um parâmetro para executar uma função JavaScript definida pelo usuário (UDF, na sigla em inglês) que é armazenada no Cloud Storage.
Os modelos fornecidos pelo Google são de código aberto sob a licença Apache 2.0, para que você possa usá-los como base para novos pipelines. Os modelos também são úteis como exemplos de código. Veja o código do modelo no repositório do GitHub (link em inglês).
Assured Workloads
O Assured Workloads ajuda a aplicar os requisitos de segurança e conformidade para clientes do Google Cloud. Por exemplo, Regiões e suporte da UE com controles de soberania ajudam a aplicar residências e garantias de soberania de dados para clientes na UE. Para fornecer esses recursos do Dataflow, alguns deles são restritos ou limitados. Se você usa o Assured Workloads com o Dataflow, todos os recursos acessados pelo pipeline precisam estar no projeto ou pasta do Assured Workloads da organização. Esses recursos incluem:
- Buckets do Cloud Storage
- Conjuntos de dados do BigQuery
- Tópicos e assinaturas do Pub/Sub
- Conjuntos de dados do Firestore
- Conectores de E/S
No Dataflow, para os jobs de streaming criados após 7 de março de 2024, todos os dados do usuário são criptografados com CMEK.
Para jobs de streaming criados antes de 7 de março de 2024, as chaves de dados usadas em operações baseadas em chaves, como janelamento, agrupamento e mesclagem, não são protegidas pela criptografia CMEK. Para ativar essa criptografia nos jobs, drene ou cancele o job e reinicie-o. Para mais informações, consulte Criptografia de artefatos de estado do pipeline.
Compartilhar dados entre pipelines
Não há nenhum mecanismo de comunicação cruzada de pipeline específico do Dataflow para compartilhamento de dados ou processamento de contexto entre pipelines. Use um armazenamento durável, como o Cloud Storage, ou um cache na memória, como o App Engine, para compartilhar dados entre instâncias de pipelines.
Programar jobs
É possível automatizar a execução do pipeline das seguintes maneiras:
- Usar o Cloud Scheduler.
- Use o operador do Dataflow do Apache Airflow, uma das muitas opções de operadores do Google Cloud em um fluxo de trabalho do Cloud Composer.
- Executar processos de (cron) job personalizados no Compute Engine
Práticas recomendadas para escrever código de pipeline
As seções a seguir fornecem práticas recomendadas para criar pipelines escrevendo o código do Apache Beam.
Estruturar seu código do Apache Beam
Para criar pipelines, é comum usar a transformação genérica do Apache Beam de processamento paralelo ParDo
.
Ao aplicar uma transformação ParDo
, você fornece o código de usuário na forma de um
objeto DoFn
. DoFn
é uma classe do SDK do Apache Beam que define uma função
de processamento distribuído.
Pense no código DoFn
como pequenas entidades independentes: várias instâncias
podem estar em execução em diferentes máquinas, sem que
haja conhecimento umas das outras. Dessa forma, recomendamos a criação de funções puras, que
são ideais para a natureza paralela e distribuída dos elementos DoFn
.
Funções puras têm as seguintes características:
- As funções puras não dependem do estado oculto ou externo.
- Eles não têm efeitos colaterais observáveis.
- Eles são determinísticos.
O modelo de função pura não é estritamente rígido. Quando seu código não depende de
coisas que não são garantidas pelo serviço Dataflow, as informações
de estado ou os dados de inicialização externos podem ser válidos para DoFn
e outros
objetos de função.
Ao estruturar as transformações ParDo
e criar os elementos DoFn
,
considere as seguintes diretrizes:
- Quando você usaprocessamento único , o serviço Dataflow garante que todos os elementos da entrada
PCollection
é processada por umDoFn
instânciaexatamente uma vez , - O serviço Dataflow não garante quantas vezes uma
DoFn
é invocada. - O serviço Dataflow não garante exatamente como os elementos distribuídos são agrupados. Isso não garante quais elementos, se houver, serão processados juntos.
- O serviço do Dataflow não garante o número exato de
instâncias de
DoFn
criadas ao longo de um pipeline. - O serviço Dataflow é tolerante a falhas e pode repetir o código várias vezes se os workers encontrarem problemas.
- O serviço Dataflow pode criar cópias de backup do seu código. Podem ocorrer problemas com efeitos colaterais manuais, por exemplo, se o código depender de arquivos temporários com nomes não exclusivos ou criá-los.
- O serviço do Dataflow serializa o processamento de elemento por
instância de
DoFn
. Seu código não precisa ser estritamente seguro para linhas de execução, mas qualquer estado compartilhado entre várias instâncias deDoFn
precisa ser seguro para linhas de execução.
Criar bibliotecas de transformações reutilizáveis
O modelo de programação do Apache Beam permite reutilizar transformações. Ao criar uma biblioteca compartilhada de transformações comuns, é possível melhorar a reutilização, a testabilidade e a propriedade do código por diferentes equipes.
Considere os dois exemplos de código Java a seguir, que leem eventos de pagamento. Supondo que ambos os pipelines executem o mesmo processamento, eles podem usar as mesmas transformações de uma biblioteca compartilhada para as etapas de processamento restantes.
O primeiro exemplo é de uma fonte ilimitada do Pub/Sub:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
O segundo exemplo é proveniente de uma fonte limitada de banco de dados relacional:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
A implementação das práticas recomendadas de reutilização de código varia de acordo com a linguagem de programação e a ferramenta de build. Por exemplo, se você usa o Maven, é possível separar o código de transformação no próprio módulo. Em seguida, você pode incluir o módulo como um submódulo em projetos com vários módulos maiores para pipelines diferentes, conforme mostrado no exemplo de código a seguir:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Para mais informações, consulte as seguintes páginas de documentação do Apache Beam:
- Requisitos para escrever código de usuário para transformações do Apache Beam
- Guia de estilo
PTransform
(link em inglês): um guia de estilo para escritores de novas coleçõesPTransform
reutilizáveis.
Usar filas de mensagens inativas para tratamento de erros
Às vezes, o pipeline não processa elementos. Problemas de dados são uma causa comum. Por exemplo, um elemento que contém JSON mal formatado pode causar falhas de análise.
Embora você possa capturar exceções no método
DoFn.ProcessElement
,
registrar o erro e descartar o elemento, essa abordagem perde os dados
e impede que sejam inspecionados posteriormente para manuseio manual ou solução de problemas.
Em vez disso, use um padrão conhecido como fila de mensagens inativas (fila de mensagens não processadas).
Capture exceções no método DoFn.ProcessElement
e registre
erros. Em vez de remover o elemento com falha,
use saídas de ramificação para gravá-lo em um objeto PCollection
separado. Esses elementos são gravados em um coletor de dados para inspeção e processamento posteriores com uma transformação separada.
O exemplo de código Java a seguir mostra como implementar o padrão de fila de mensagens inativas.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
É possível usar o Cloud Monitoring para aplicar diferentes políticas de monitoramento e alerta na fila de mensagens inativas do pipeline. Por exemplo, é possível visualizar o número e o tamanho dos elementos processados pela transformação de mensagens inativas e configurar os alertas para serem acionados se determinadas condições de limite forem atendidas.
Gerenciar mutações de esquema
É possível processar dados que tenham esquemas inesperados (mas válidos) usando um padrão de mensagens inativas,
que grava elementos com falha em um objeto PCollection
separado.
Em alguns casos, você quer processar automaticamente os elementos
que refletem um esquema modificado como elementos válidos. Por exemplo, se o esquema
de um elemento refletir uma mutação, como a adição de novos campos, será possível adaptar o
esquema do coletor de dados para acomodar mutações.
A mutação automática de esquema depende da abordagem de ramificação e da saída usada pelo padrão de mensagens inativas. No entanto, nesse caso, ele aciona uma transformação que modifica o esquema de destino sempre que esquemas aditivos forem encontrados. Para ver um exemplo dessa abordagem, consulte Como processar a modificação de esquemas JSON em um pipeline de streaming, com a Square Enix no blog do Google Cloud.
Decidir como mesclar conjuntos de dados
A junção de conjuntos de dados é um caso de uso comum para pipelines de dados. É possível usar
entradas secundárias ou a transformação CoGroupByKey
para realizar mesclagens no pipeline.
Cada uma tem vantagens e desvantagens.
As entradas secundárias
fornecem uma maneira flexível de resolver problemas comuns de processamento de dados, como
enriquecimento de dados e pesquisas com chaves. Ao contrário dos objetos PCollection
, as entradas secundárias são
mutáveis e podem ser determinadas no momento da execução. Por exemplo, os valores em uma
entrada secundária podem ser calculados por outra ramificação no pipeline ou determinados
chamando um serviço remoto.
O Dataflow é compatível com entradas secundárias usando a persistência de dados no armazenamento permanente (semelhante a um disco compartilhado), o que disponibiliza a entrada secundária completa para todos os workers.
Os tamanhos de entrada secundária podem ser muito grandes e não caberem na memória do worker. A leitura de uma entrada secundária grande pode causar problemas de desempenho se os workers precisarem ler constantemente do armazenamento permanente.
A transformação
CoGroupByKey
é uma
transformação principal do Apache Beam
que combina (nivela) vários objetos PCollection
e elementos de grupos que
tenham uma chave comum. Diferente de uma entrada secundária, que disponibiliza todos os dados de entrada secundária
para cada worker, a CoGroupByKey
executa uma operação de embaralhamento (agrupamento)
para distribuir dados entre os workers. Portanto, CoGroupByKey
é ideal quando os
objetos PCollection
que você quer mesclar são muito grandes e não se encaixam na memória
do worker.
Siga estas diretrizes para decidir se quer usar entradas secundárias ou
CoGroupByKey
:
- Use entradas secundárias quando um dos objetos
PCollection
que você está mesclando for desproporcionalmente menor que o outro e o objetoPCollection
menor se encaixar na memória do worker. Armazenar toda a entrada secundária em cache na memória torna a busca de elementos rápida e eficiente. - Use entradas secundárias quando tiver um objeto
PCollection
que deva ser unido várias vezes no pipeline. Em vez de usar várias transformaçõesCoGroupByKey
, crie uma única entrada secundária que possa ser reutilizada por várias transformaçõesParDo
. - Use
CoGroupByKey
se você precisar buscar uma grande proporção de um objetoPCollection
que exceda significativamente a memória do worker.
Para mais informações, consulte Resolver problemas de falta de memória no Dataflow.
Minimizar operações caras por elemento
Uma instância DoFn
processa lotes de elementos chamados
pacotes,
(unidades atômicas de trabalho que consistem em zero ou mais
elementos). Os elementos individuais são processados pelo método
DoFn.ProcessElement
,
que é executado para cada elemento. Como o método DoFn.ProcessElement
é chamado para cada elemento, qualquer operação demorada ou de computação
cara invocada por esse método
é executada para cada elemento processado.
Se você precisar executar operações caras apenas uma vez para um lote de elementos,
inclua-as nos métodos DoFn.Setup
ou DoFn.StartBundle
,
em vez de no elemento DoFn.ProcessElement
. Os exemplos incluem as
operações a seguir:
Análise de um arquivo de configuração que controla algum aspecto do comportamento da instância
DoFn
. Invoque essa ação apenas uma vez, quando a instânciaDoFn
for inicializada usando o métodoDoFn.Setup
.Ao instanciar um cliente de curta duração que é reutilizado em todos os elementos de um pacote, por exemplo, quando todos os elementos do pacote são enviados por uma única conexão de rede. Invoque essa ação uma vez por pacote usando o método
DoFn.StartBundle
.
Limitar os tamanhos de lotes e as chamadas simultâneas a serviços externos
Ao chamar serviços externos, você pode reduzir as sobrecargas por chamada usando a
transformação
GroupIntoBatches
. Esta transformação cria lotes de elementos de um tamanho especificado.
O envio em lote envia
elementos para um serviço externo como um payload em vez de
individualmente.
Em combinação com lotes, é possível limitar o número máximo de chamadas paralelas (simultâneas) ao serviço externo escolhendo as chaves apropriadas para particionar os dados recebidos. O número de partições determina o carregamento em paralelo máximo. Por exemplo, se cada elemento receber a mesma chave, uma transformação downstream para chamar o serviço externo não será executada em paralelo.
Considere uma das seguintes abordagens para produzir chaves para elementos:
- Escolha um atributo do conjunto de dados para usar como chaves de dados, como IDs de usuário.
- Gere chaves de dados para dividir elementos de maneira aleatória em um número fixo de
partições, em que o número possível de chaves-valor determina o número
de partições. Você precisa criar partições suficientes para paralelismo.
Cada partição precisa ter elementos suficientes para que a transformação
GroupIntoBatches
seja útil.
No exemplo de código Java a seguir, mostramos como dividir aleatoriamente elementos em 10 partições:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identificar problemas de desempenho causados por etapas com a combinação inadequada
O Dataflow cria um gráfico de etapas que representa o pipeline, com base nas transformações e nos dados usados para criá-lo. Isso é chamado de gráfico de execução de pipeline.
Quando você implanta o pipeline, o Dataflow pode modificar
o gráfico de execução do pipeline para melhorar o desempenho. Por exemplo, o Dataflow
pode unir algumas operações, um processo conhecido como
otimização de fusão,
para evitar o impacto de desempenho e custo de cada gravação de objeto intermediário
PCollection
no pipeline.
Em alguns casos, o Dataflow pode determinar incorretamente a melhor forma de fundir operações no pipeline, o que pode limitar a capacidade do serviço do Dataflow de usar todos os workers disponíveis. Nesses casos, é possível impedir a fusão das operações.
Considere o exemplo de código do Apache Beam a seguir. Uma transformação
GenerateSequence
cria um pequeno objeto PCollection
limitado, que depois será processado
por duas transformações ParDo
downstream.
A transformação Find Primes Less-than-N
pode ser cara em termos de computação e provavelmente
será executada lentamente para números grandes. Em contraste, a transformação Increment Number
provavelmente é concluída rapidamente.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
O diagrama a seguir mostra uma representação gráfica do pipeline na interface de monitoramento do Dataflow.
A interface de monitoramento do Dataflow mostra que a mesma taxa lenta de processamento ocorre para as duas transformações, especificamente 13 elementos por segundo. Espera-se que a transformação Increment Number
processe
elementos rapidamente, mas parece que ela está vinculada à mesma taxa de
processamento que Find Primes Less-than-N
.
O motivo é que o Dataflow mesclava as etapas em uma única
fase, o que as impedia de serem executadas de maneira independente. Use o comando gcloud dataflow jobs describe
para encontrar mais informações:
gcloud dataflow jobs describe --full job-id --format json
Na saída resultante, as etapas combinadas são descritas no objeto
ExecutionStageSummary
na matriz
ComponentTransform
:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
Nesse cenário, a transformação Find Primes Less-than-N
é a etapa lenta. Portanto,
a quebra da mesclagem antes dessa etapa é uma estratégia apropriada. Um método para
separar as etapas é inserir uma transformação
GroupByKey
e realizar o desagrupamento antes da etapa, conforme mostrado no exemplo de código Java
a seguir:
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Também é possível combinar essas etapas em uma transformação composta reutilizável.
Depois de cancelar a fusão das etapas, ao executar o pipeline, Increment Number
é concluído em questão de segundos e a transformação Find Primes Less-than-N
, que é muito mais longa, é executada em um estágio separado.
Este exemplo aplica uma operação de agrupamento e desagrupamento para propagar as etapas.
É possível usar outras abordagens para outras circunstâncias. Nesse caso, o processamento
da saída duplicada não é um problema, dada a saída consecutiva da
transformação
GenerateSequence
.
Objetos KV
com chaves duplicadas são desduplicados para uma única chave na transformação de grupo (GroupByKey
) e no desagrupar (Keys
). Para reter cópias após as operações de agrupar e desagrupar,
crie pares de chave-valor usando as seguintes etapas:
- Use uma chave aleatória e a entrada original como o valor.
- Agrupar usando a chave aleatória.
- Emita os valores de cada chave como saída.
Você também pode usar uma transformação Reshuffle
para evitar a fusão de transformações ao redor. No entanto, os efeitos colaterais da
transformação Reshuffle
não são portáteis para diferentes
executores do Apache Beam.
Para mais informações sobre paralelismo e otimização de fusão, consulte Ciclo de vida do pipeline.
Usar métricas do Apache Beam para coletar insights do pipeline
As métricas do Apache Beam são uma classe de utilitário que produz métricas para relatar as propriedades de um pipeline em execução. Quando você usa o Cloud Monitoring, as métricas do Apache Beam ficam disponíveis como métricas personalizadas do Cloud Monitoring.
O exemplo a seguir mostra as métricas Counter
do Apache Beam usadas em uma subclasse DoFn
.
O código de exemplo usa dois contadores. Um contador rastreia falhas de análise JSON
(malformedCounter
) e o outro monitora se a mensagem JSON é
válida, mas contém um payload vazio (emptyCounter
). No Cloud Monitoring,
os nomes das métricas personalizadas são custom.googleapis.com/dataflow/malformedJson
e
custom.googleapis.com/dataflow/emptyPayload
. Use as métricas personalizadas
para criar visualizações e políticas de alertas no Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
Saiba mais
Nas páginas a seguir, você encontra mais informações sobre como estruturar o pipeline, como escolher quais transformações aplicar aos dados e o que considerar ao escolher os métodos de entrada e saída do pipeline.
Para mais informações sobre como criar seu código de usuário, consulte os requisitos para funções fornecidas pelo usuário.