Práticas recomendadas para pipelines do Dataflow

Nesta página, descrevemos as práticas recomendadas para desenvolver pipelines do Dataflow. O uso dessas práticas recomendadas tem os seguintes benefícios:

  • Melhore a observabilidade e o desempenho do pipeline
  • Melhora na produtividade dos desenvolvedores
  • Aprimorar a capacidade de teste de pipelines

Os exemplos de código do Apache Beam nesta página usam Java, mas o conteúdo se aplica aos SDKs do Apache Beam Java, Python e Go.

Perguntas a serem consideradas

Ao projetar o pipeline, considere as seguintes perguntas:

  • Onde os dados de entrada do pipeline são armazenados? Quantos conjuntos de dados de entrada existem?
  • Como são seus dados?
  • O que você quer fazer com os dados?
  • Para onde devem ir os dados de saída do pipeline?
  • Seu job do Dataflow usa o Assured Workloads?

Usar modelos

Para acelerar o desenvolvimento do pipeline, em vez de criar um pipeline escrevendo um código do Apache Beam, use um modelo do Dataflow quando possível. Os modelos têm os seguintes benefícios:

  • Os modelos são reutilizáveis.
  • Os modelos permitem personalizar cada job mudando parâmetros de pipeline específicos.
  • Assim, qualquer pessoa a quem você fornecer permissões poderá usar o modelo para implantar o pipeline. Por exemplo, um desenvolvedor pode criar um job de um modelo, e um cientista de dados na organização pode implantar esse modelo posteriormente.

É possível usar um modelo fornecido pelo Google ou criar seu próprio modelo. Alguns modelos fornecidos pelo Google permitem adicionar lógica personalizada como uma etapa do pipeline. Por exemplo, a assinatura do Pub/Sub para o modelo do BigQuery fornece um parâmetro para executar uma função JavaScript definida pelo usuário (UDF, na sigla em inglês) que é armazenada no Cloud Storage.

Os modelos fornecidos pelo Google são de código aberto sob a licença Apache 2.0, para que você possa usá-los como base para novos pipelines. Os modelos também são úteis como exemplos de código. Veja o código do modelo no repositório do GitHub (link em inglês).

Assured Workloads

O Assured Workloads ajuda a aplicar os requisitos de segurança e conformidade para clientes do Google Cloud. Por exemplo, Regiões e suporte da UE com controles de soberania ajudam a aplicar residências e garantias de soberania de dados para clientes na UE. Para fornecer esses recursos do Dataflow, alguns deles são restritos ou limitados. Se você usa o Assured Workloads com o Dataflow, todos os recursos acessados pelo pipeline precisam estar no projeto ou pasta do Assured Workloads da organização. Esses recursos incluem:

  • Buckets do Cloud Storage
  • Conjuntos de dados do BigQuery
  • Tópicos e assinaturas do Pub/Sub
  • Conjuntos de dados do Firestore
  • Conectores de E/S

No Dataflow, para os jobs de streaming criados após 7 de março de 2024, todos os dados do usuário são criptografados com CMEK.

Para jobs de streaming criados antes de 7 de março de 2024, as chaves de dados usadas em operações baseadas em chaves, como janelamento, agrupamento e mesclagem, não são protegidas pela criptografia CMEK. Para ativar essa criptografia nos jobs, drene ou cancele o job e reinicie-o. Para mais informações, consulte Criptografia de artefatos de estado do pipeline.

Compartilhar dados entre pipelines

Não há nenhum mecanismo de comunicação cruzada de pipeline específico do Dataflow para compartilhamento de dados ou processamento de contexto entre pipelines. Use um armazenamento durável, como o Cloud Storage, ou um cache na memória, como o App Engine, para compartilhar dados entre instâncias de pipelines.

Programar jobs

É possível automatizar a execução do pipeline das seguintes maneiras:

Práticas recomendadas para escrever código de pipeline

As seções a seguir fornecem práticas recomendadas para criar pipelines escrevendo o código do Apache Beam.

Estruturar seu código do Apache Beam

Para criar pipelines, é comum usar a transformação genérica do Apache Beam de processamento paralelo ParDo. Ao aplicar uma transformação ParDo, você fornece o código de usuário na forma de um objeto DoFn. DoFn é uma classe do SDK do Apache Beam que define uma função de processamento distribuído.

Pense no código DoFn como pequenas entidades independentes: várias instâncias podem estar em execução em diferentes máquinas, sem que haja conhecimento umas das outras. Dessa forma, recomendamos a criação de funções puras, que são ideais para a natureza paralela e distribuída dos elementos DoFn. Funções puras têm as seguintes características:

  • As funções puras não dependem do estado oculto ou externo.
  • Eles não têm efeitos colaterais observáveis.
  • Eles são determinísticos.

O modelo de função pura não é estritamente rígido. Quando seu código não depende de coisas que não são garantidas pelo serviço Dataflow, as informações de estado ou os dados de inicialização externos podem ser válidos para DoFn e outros objetos de função.

Ao estruturar as transformações ParDo e criar os elementos DoFn, considere as seguintes diretrizes:

  • Quando você usaprocessamento único , o serviço Dataflow garante que todos os elementos da entradaPCollection é processada por umDoFn instânciaexatamente uma vez ,
  • O serviço Dataflow não garante quantas vezes uma DoFn é invocada.
  • O serviço Dataflow não garante exatamente como os elementos distribuídos são agrupados. Isso não garante quais elementos, se houver, serão processados juntos.
  • O serviço do Dataflow não garante o número exato de instâncias de DoFn criadas ao longo de um pipeline.
  • O serviço Dataflow é tolerante a falhas e pode repetir o código várias vezes se os workers encontrarem problemas.
  • O serviço Dataflow pode criar cópias de backup do seu código. Podem ocorrer problemas com efeitos colaterais manuais, por exemplo, se o código depender de arquivos temporários com nomes não exclusivos ou criá-los.
  • O serviço do Dataflow serializa o processamento de elemento por instância de DoFn. Seu código não precisa ser estritamente seguro para linhas de execução, mas qualquer estado compartilhado entre várias instâncias de DoFn precisa ser seguro para linhas de execução.

Criar bibliotecas de transformações reutilizáveis

O modelo de programação do Apache Beam permite reutilizar transformações. Ao criar uma biblioteca compartilhada de transformações comuns, é possível melhorar a reutilização, a testabilidade e a propriedade do código por diferentes equipes.

Considere os dois exemplos de código Java a seguir, que leem eventos de pagamento. Supondo que ambos os pipelines executem o mesmo processamento, eles podem usar as mesmas transformações de uma biblioteca compartilhada para as etapas de processamento restantes.

O primeiro exemplo é de uma fonte ilimitada do Pub/Sub:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

O segundo exemplo é proveniente de uma fonte limitada de banco de dados relacional:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

A implementação das práticas recomendadas de reutilização de código varia de acordo com a linguagem de programação e a ferramenta de build. Por exemplo, se você usa o Maven, é possível separar o código de transformação no próprio módulo. Em seguida, você pode incluir o módulo como um submódulo em projetos com vários módulos maiores para pipelines diferentes, conforme mostrado no exemplo de código a seguir:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

Para mais informações, consulte as seguintes páginas de documentação do Apache Beam:

Usar filas de mensagens inativas para tratamento de erros

Às vezes, o pipeline não processa elementos. Problemas de dados são uma causa comum. Por exemplo, um elemento que contém JSON mal formatado pode causar falhas de análise.

Embora você possa capturar exceções no método DoFn.ProcessElement, registrar o erro e descartar o elemento, essa abordagem perde os dados e impede que sejam inspecionados posteriormente para manuseio manual ou solução de problemas.

Em vez disso, use um padrão conhecido como fila de mensagens inativas (fila de mensagens não processadas). Capture exceções no método DoFn.ProcessElement e registre erros. Em vez de remover o elemento com falha, use saídas de ramificação para gravá-lo em um objeto PCollection separado. Esses elementos são gravados em um coletor de dados para inspeção e processamento posteriores com uma transformação separada.

O exemplo de código Java a seguir mostra como implementar o padrão de fila de mensagens inativas.

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

É possível usar o Cloud Monitoring para aplicar diferentes políticas de monitoramento e alerta na fila de mensagens inativas do pipeline. Por exemplo, é possível visualizar o número e o tamanho dos elementos processados pela transformação de mensagens inativas e configurar os alertas para serem acionados se determinadas condições de limite forem atendidas.

Gerenciar mutações de esquema

É possível processar dados que tenham esquemas inesperados (mas válidos) usando um padrão de mensagens inativas, que grava elementos com falha em um objeto PCollection separado. Em alguns casos, você quer processar automaticamente os elementos que refletem um esquema modificado como elementos válidos. Por exemplo, se o esquema de um elemento refletir uma mutação, como a adição de novos campos, será possível adaptar o esquema do coletor de dados para acomodar mutações.

A mutação automática de esquema depende da abordagem de ramificação e da saída usada pelo padrão de mensagens inativas. No entanto, nesse caso, ele aciona uma transformação que modifica o esquema de destino sempre que esquemas aditivos forem encontrados. Para ver um exemplo dessa abordagem, consulte Como processar a modificação de esquemas JSON em um pipeline de streaming, com a Square Enix no blog do Google Cloud.

Decidir como ingressar em bancos de dados

A junção de conjuntos de dados é um caso de uso comum para pipelines de dados. É possível usar entradas secundárias ou a transformação CoGroupByKey para realizar mesclagens no pipeline. Cada uma tem vantagens e desvantagens.

As entradas secundárias fornecem uma maneira flexível de resolver problemas comuns de processamento de dados, como enriquecimento de dados e pesquisas com chaves. Ao contrário dos objetos PCollection, as entradas secundárias são mutáveis e podem ser determinadas no momento da execução. Por exemplo, os valores em uma entrada secundária podem ser calculados por outra ramificação no pipeline ou determinados chamando um serviço remoto.

O Dataflow é compatível com entradas secundárias usando a persistência de dados no armazenamento permanente (semelhante a um disco compartilhado), o que disponibiliza a entrada secundária completa para todos os workers.

Os tamanhos de entrada secundária podem ser muito grandes e não caberem na memória do worker. A leitura de uma entrada secundária grande pode causar problemas de desempenho se os workers precisarem ler constantemente do armazenamento permanente.

A transformação CoGroupByKey é uma transformação principal do Apache Beam que combina (nivela) vários objetos PCollection e elementos de grupos que tenham uma chave comum. Diferente de uma entrada secundária, que disponibiliza todos os dados de entrada secundária para cada worker, a CoGroupByKey executa uma operação de embaralhamento (agrupamento) para distribuir dados entre os workers. Portanto, CoGroupByKey é ideal quando os objetos PCollection que você quer mesclar são muito grandes e não se encaixam na memória do worker.

Siga estas diretrizes para decidir se quer usar entradas secundárias ou CoGroupByKey:

  • Use entradas secundárias quando um dos objetos PCollection que você está mesclando for desproporcionalmente menor que o outro e o objeto PCollection menor se encaixar na memória do worker. Armazenar toda a entrada secundária em cache na memória torna a busca de elementos rápida e eficiente.
  • Use entradas secundárias quando tiver um objeto PCollection que deva ser unido várias vezes no pipeline. Em vez de usar várias transformações CoGroupByKey, crie uma única entrada secundária que possa ser reutilizada por várias transformações ParDo.
  • Use CoGroupByKey se você precisar buscar uma grande proporção de um objeto PCollection que exceda significativamente a memória do worker.

Para mais informações, consulte Resolver problemas de falta de memória no Dataflow.

Minimizar operações caras por elemento

Uma instância DoFn processa lotes de elementos chamados pacotes, (unidades atômicas de trabalho que consistem em zero ou mais elementos). Os elementos individuais são processados pelo método DoFn.ProcessElement, que é executado para cada elemento. Como o método DoFn.ProcessElement é chamado para cada elemento, qualquer operação demorada ou de computação cara invocada por esse método é executada para cada elemento processado.

Se você precisar executar operações caras apenas uma vez para um lote de elementos, inclua-as nos métodos DoFn.Setup ou DoFn.StartBundle, em vez de no elemento DoFn.ProcessElement. Os exemplos incluem as operações a seguir:

  • Análise de um arquivo de configuração que controla algum aspecto do comportamento da instância DoFn. Invoque essa ação apenas uma vez, quando a instância DoFn for inicializada usando o método DoFn.Setup.

  • Ao instanciar um cliente de curta duração que é reutilizado em todos os elementos de um pacote, por exemplo, quando todos os elementos do pacote são enviados por uma única conexão de rede. Invoque essa ação uma vez por pacote usando o método DoFn.StartBundle.

Limitar os tamanhos de lotes e as chamadas simultâneas a serviços externos

Ao chamar serviços externos, você pode reduzir as sobrecargas por chamada usando a transformação GroupIntoBatches. Esta transformação cria lotes de elementos de um tamanho especificado. O envio em lote envia elementos para um serviço externo como um payload em vez de individualmente.

Em combinação com lotes, é possível limitar o número máximo de chamadas paralelas (simultâneas) ao serviço externo escolhendo as chaves apropriadas para particionar os dados recebidos. O número de partições determina o carregamento em paralelo máximo. Por exemplo, se cada elemento receber a mesma chave, uma transformação downstream para chamar o serviço externo não será executada em paralelo.

Considere uma das seguintes abordagens para produzir chaves para elementos:

  • Escolha um atributo do conjunto de dados para usar como chaves de dados, como IDs de usuário.
  • Gere chaves de dados para dividir elementos de maneira aleatória em um número fixo de partições, em que o número possível de chaves-valor determina o número de partições. Você precisa criar partições suficientes para paralelismo. Cada partição precisa ter elementos suficientes para que a transformação GroupIntoBatches seja útil.

No exemplo de código Java a seguir, mostramos como dividir aleatoriamente elementos em 10 partições:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Identificar problemas de desempenho causados por etapas com a combinação inadequada

O Dataflow cria um gráfico de etapas que representa o pipeline, com base nas transformações e nos dados usados para criá-lo. Isso é chamado de gráfico de execução de pipeline.

Quando você implanta o pipeline, o Dataflow pode modificar o gráfico de execução do pipeline para melhorar o desempenho. Por exemplo, o Dataflow pode unir algumas operações, um processo conhecido como otimização de fusão, para evitar o impacto de desempenho e custo de cada gravação de objeto intermediário PCollection no pipeline.

Em alguns casos, o Dataflow pode determinar incorretamente a melhor forma de fundir operações no pipeline, o que pode limitar a capacidade do serviço do Dataflow de usar todos os workers disponíveis. Nesses casos, é possível impedir a fusão das operações.

Considere o exemplo de código do Apache Beam a seguir. Uma transformação GenerateSequence cria um pequeno objeto PCollection limitado, que depois será processado por duas transformações ParDo downstream.

A transformação Find Primes Less-than-N pode ser cara em termos de computação e provavelmente será executada lentamente para números grandes. Em contraste, a transformação Increment Number provavelmente é concluída rapidamente.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

O diagrama a seguir mostra uma representação gráfica do pipeline na interface de monitoramento do Dataflow.

Representação do fluxo do pipeline na interface do Dataflow.

A interface de monitoramento do Dataflow mostra que a mesma taxa lenta de processamento ocorre para as duas transformações, especificamente 13 elementos por segundo. Espera-se que a transformação Increment Number processe elementos rapidamente, mas parece que ela está vinculada à mesma taxa de processamento que Find Primes Less-than-N.

O motivo é que o Dataflow mesclava as etapas em uma única fase, o que as impedia de serem executadas de maneira independente. Use o comando gcloud dataflow jobs describe para encontrar mais informações:

gcloud dataflow jobs describe --full job-id --format json

Na saída resultante, as etapas combinadas são descritas no objeto ExecutionStageSummary na matriz ComponentTransform :

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

Nesse cenário, a transformação Find Primes Less-than-N é a etapa lenta. Portanto, a quebra da mesclagem antes dessa etapa é uma estratégia apropriada. Um método para separar as etapas é inserir uma transformação GroupByKey e realizar o desagrupamento antes da etapa, conforme mostrado no exemplo de código Java a seguir:

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

Também é possível combinar essas etapas em uma transformação composta reutilizável.

Depois de cancelar a fusão das etapas, ao executar o pipeline, Increment Number é concluído em questão de segundos e a transformação Find Primes Less-than-N, que é muito mais longa, é executada em um estágio separado.

Este exemplo aplica uma operação de agrupamento e desagrupamento para propagar as etapas. É possível usar outras abordagens para outras circunstâncias. Nesse caso, o processamento da saída duplicada não é um problema, dada a saída consecutiva da transformação GenerateSequence. Objetos KV com chaves duplicadas são desduplicados para uma única chave na transformação de grupo (GroupByKey) e no desagrupar (Keys). Para reter cópias após as operações de agrupar e desagrupar, crie pares de chave-valor usando as seguintes etapas:

  1. Use uma chave aleatória e a entrada original como o valor.
  2. Agrupar usando a chave aleatória.
  3. Emita os valores de cada chave como saída.

Você também pode usar uma transformação Reshuffle para evitar a fusão de transformações ao redor. No entanto, os efeitos colaterais da transformação Reshuffle não são portáteis para diferentes executores do Apache Beam.

Para mais informações sobre paralelismo e otimização de fusão, consulte Ciclo de vida do pipeline.

Usar métricas do Apache Beam para coletar insights do pipeline

As métricas do Apache Beam são uma classe de utilitário que produz métricas para relatar as propriedades de um pipeline em execução. Quando você usa o Cloud Monitoring, as métricas do Apache Beam ficam disponíveis como métricas personalizadas do Cloud Monitoring.

O exemplo a seguir mostra as métricas Counter do Apache Beam usadas em uma subclasse DoFn.

O código de exemplo usa dois contadores. Um contador rastreia falhas de análise JSON (malformedCounter) e o outro monitora se a mensagem JSON é válida, mas contém um payload vazio (emptyCounter). No Cloud Monitoring, os nomes das métricas personalizadas são custom.googleapis.com/dataflow/malformedJson e custom.googleapis.com/dataflow/emptyPayload. Use as métricas personalizadas para criar visualizações e políticas de alertas no Cloud Monitoring.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

Saiba mais

Nas páginas a seguir, você encontra mais informações sobre como estruturar o pipeline, como escolher quais transformações aplicar aos dados e o que considerar ao escolher os métodos de entrada e saída do pipeline.

Para mais informações sobre como criar seu código de usuário, consulte os requisitos para funções fornecidas pelo usuário.