Criar conexões de fluxo de alterações usando o Dataflow

Nesta página, demonstramos como criar pipelines do Dataflow que consomem e encaminham dados de alteração do Spanner usando fluxos de alterações. Use o código de exemplo nesta página para criar pipelines personalizados.

Principais conceitos

Veja a seguir alguns conceitos básicos para pipelines do Dataflow para fluxo de alterações.

O Dataflow

O Dataflow é um serviço sem servidor, rápido e econômico, compatível com processamento de stream e em lote. Ele fornece portabilidade com jobs de processamento escritos usando as bibliotecas de código aberto Apache Beam e automatiza o provisionamento de infraestrutura e o gerenciamento de clusters. O Dataflow fornece streaming quase em tempo real, com aproximadamente seis segundos de latência ao ler os fluxo de alterações.

É possível usar o Dataflow para consumir fluxos de alterações do Spanner com o conector do SpannerIO, que oferece uma abstração sobre a API Spanner para consultar fluxo de alterações. Com esse conector, você não precisa gerenciar o ciclo de vida da partição de fluxo de alterações, o que é necessário ao usar a API Spanner diretamente. Ele oferece um fluxo de registros de alteração de dados para que você possa se concentrar mais na lógica do aplicativo e menos em detalhes específicos da API e no particionamento de fluxo de alterações dinâmicas. Recomendamos o uso do conector do SpannerIO em vez da API Spanner na maioria das circunstâncias em que você precisa ler dados de fluxo de alterações.

Os modelos do Dataflow são pipelines pré-criados que implementam casos de uso comuns. Consulte Modelos do Dataflow para ter uma visão geral.

Pipeline do Dataflow

Um pipeline do Dataflow de fluxo de alterações do Spanner é composto de quatro partes principais:

  1. Um banco de dados do Spanner com um fluxo de alterações
  2. O conector do SpannerIO
  3. Transformações e coletores definidos pelo usuário
  4. Um gravador de E/S de coletor

imagem

Cada um deles é discutido com mais detalhes abaixo.

Fluxo de alterações do Spanner

Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações.

Conector do SpannerIO do Apache Beam

Este é o conector do SpannerIO descrito anteriormente. Ele é um conector de E/S de origem que emite um PCollection de registros de alteração de dados para estágios posteriores do pipeline. O horário do evento para cada registro de alteração de dados emitido será o carimbo de data/hora de confirmação. Os registros emitidos são não ordenados, e o conector do SpannerIO garante que não haverá registros atrasados.

Ao trabalhar com fluxo de alterações, o Dataflow usa um checkpoint. Como resultado, cada worker pode aguardar até cinco segundos enquanto armazena as alterações em buffer antes de enviá-las para processamento adicional. Espera-se uma latência de aproximadamente seis segundos.

Transformações definidas pelo usuário

Uma transformação definida pelo usuário permite que o usuário agregue, transforme ou modifique dados de processamento em um pipeline do Dataflow. Casos de uso comuns para isso são a remoção de informações de identificação pessoal, o cumprimento dos requisitos de formato de dados downstream e a classificação. Consulte a documentação oficial do Apache Beam para ver o guia de programação sobre transforms.

Gravador de E/S do coletor do Apache Beam

O Apache Beam contém transformações integradas de E/S que podem ser usadas para gravar de um pipeline do Dataflow em um coletor de dados como o BigQuery. A maioria dos coletores de dados tem compatibilidade nativa.

Modelos do Dataflow

Os modelos do Dataflow oferecem uma maneira fácil de criar jobs do Dataflow com base em imagens do Docker predefinidas para casos de uso comuns por meio do console do Google Cloud, da CLI do Google Cloud ou de chamadas da API REST.

Para os fluxo de alterações do Spanner, fornecemos três modelos Flex do Dataflow:

crie um pipeline do Dataflow

Nesta seção, abordamos a configuração inicial do conector e fornecemos exemplos de integrações comuns com a funcionalidade de fluxo de alterações do Spanner.

Para seguir estas etapas, você precisa de um ambiente de desenvolvimento em Java para o Dataflow. Para mais informações, acesse Criar um pipeline do Dataflow usando Java.

Criar um stream de alterações

Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar com as próximas etapas, você precisa ter um banco de dados do Spanner com um fluxo de alterações configurado.

Conceder privilégios de controle de acesso refinados

Se você espera que qualquer usuário de controle de acesso detalhado execute o job do Dataflow, verifique se eles têm acesso a um papel de banco de dados com o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função com valor de tabela do fluxo de alterações. Além disso, verifique se o principal especifica o papel do banco de dados na configuração do SpannerIO ou no modelo flexível do Dataflow.

Para mais informações, consulte Sobre controle de acesso detalhado.

Adicionar o conector do SpannerIO como uma dependência

O conector do Apache Beam SpannerIO encapsula a complexidade de consumir os fluxos de alterações diretamente pela API Cloud Spanner, emitindo uma PCollection de registros de dados de fluxo de alterações para estágios posteriores do pipeline.

Esses objetos podem ser consumidos em outros estágios do pipeline do Dataflow do usuário. A integração do fluxo de alterações faz parte do conector do SpannerIO. Para usar o conector SpannerIO, a dependência precisa ser adicionada ao arquivo pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Criar um banco de dados de metadados

O conector precisa rastrear cada partição ao executar o pipeline do Apache Beam. Ele mantém esses metadados em uma tabela do Spanner criada pelo conector durante a inicialização. Você especifica o banco de dados em que essa tabela será criada durante a configuração do conector.

Conforme descrito em Práticas recomendadas de fluxos de alteração, recomendamos que você crie um novo banco de dados para essa finalidade, em vez de permitir que o conector use o banco de dados do aplicativo para armazenar a tabela de metadados dele.

O proprietário de um job do Dataflow que usa o conector SpannerIO precisa ter as seguintes permissões do IAM definidas com esse banco de dados de metadados:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurar o conector

O conector de fluxo de alterações do Spanner pode ser configurado da seguinte maneira:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Confira a seguir as descrições das opções de readChangeStream():

Configuração do Spanner (obrigatório)

Usado para configurar o projeto, a instância e o banco de dados em que o fluxo de alterações foi criado e precisa ser consultado. Também pode especificar o papel do banco de dados a ser usado quando o principal do IAM que está executando o job do Dataflow é um usuário de controle de acesso detalhado. O job assume esse papel de banco de dados para acessar o fluxo de alterações. Para mais informações, consulte Sobre controle de acesso detalhado.

Nome do fluxo de alterações (obrigatório)

Esse nome identifica o fluxo de alterações de maneira exclusiva. O nome precisa ser o mesmo usado na criação.

ID da instância de metadados (opcional)

Esta é a instância que armazena os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.

ID do banco de dados de metadados (obrigatório)

Esse é o banco de dados que armazena os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.

Nome da tabela de metadados (opcional)

Só use esse recurso ao atualizar um pipeline atual.

Esse é o nome da tabela de metadados preexistente a ser usado pelo conector. Usado pelo conector para armazenar os metadados e controlar o consumo dos dados da API do fluxo de alterações. Se essa opção for omitida, o Spanner criará uma nova tabela com um nome gerado na inicialização do conector.

Prioridade de RPC (opcional)

A prioridade de solicitação a ser usada para as consultas de fluxo de alterações. Se esse parâmetro for omitido, high priority será usado.

InclusiveStartAt (obrigatório)

As alterações do carimbo de data/hora fornecido são retornadas ao autor da chamada.

InclusiveEndAt (opcional)

As alterações até o carimbo de data/hora especificado são retornadas ao autor da chamada. Se esse parâmetro for omitido, as alterações serão emitidas indefinidamente.

Adicionar transformações e coletores para processar dados de alteração

Depois de concluir as etapas anteriores, o conector configurado do SpannerIO está pronto para emitir uma PCollection de objetos DataChangeRecord. Consulte Exemplos de transformações e coletores para ver várias amostras de configurações de pipeline que processam esses dados de streaming de várias maneiras.

Os registros do fluxo de alterações emitidos pelo conector do SpannerIO são desordenados. Isso ocorre porque as PCollections não oferecem garantias de ordenação. Se você precisar de um stream ordenado, agrupe e classifique os registros como transformações nos pipelines. Consulte Amostra: ordenar por chave. É possível estender esse exemplo para classificar os registros com base em qualquer campo dos registros, como por IDs de transação.

Exemplos de transformações e coletores

Você pode definir suas próprias transformações e especificar coletores para gravar os dados. A documentação do Apache Beam fornece uma série de transforms que podem ser aplicadas, além de conectores de E/S prontas para uso na gravação dos dados em sistemas externos.

Amostra: ordenar por chave

Este exemplo de código emite registros de alteração de dados ordenados por carimbo de data/hora de confirmação e agrupados por chaves primárias usando o conector do Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Este exemplo de código usa estados e timers para armazenar registros em buffer para cada chave e define o prazo de validade do timer como um horário configurado pelo usuário T no futuro, definido na função BufferKeyUntilOutputTimestamp. Quando a marca d'água do Dataflow passa o tempo T, esse código limpa todos os registros no buffer com carimbo de data/hora menor que T, ordena esses registros por carimbo de data/hora de confirmação e gera um par de chave-valor em que:

  • A chave é a chave de entrada, ou seja, a chave primária com hash em uma matriz de bucket de tamanho 1.000.
  • O valor são os registros de alteração de dados ordenados que foram armazenados em buffer para a chave.

Para cada chave, temos as seguintes garantias:

  • Os timers são acionados conforme o carimbo de data/hora de expiração.
  • Os estágios downstream recebem os elementos na mesma ordem em que foram produzidos.

Por exemplo, digamos que, para uma chave com o valor 100, o timer seja disparado em T1 e T10, respectivamente, produzindo um pacote de registros de mudança de dados em cada carimbo de data/hora. Como os registros de mudança de dados gerados em T1 foram produzidos antes daqueles em T10, os registros de mudança de dados gerados em T1 também vão ser recebidos na próxima etapa, antes dos registros de mudança de dados em T10. Esse mecanismo nos ajuda a garantir a ordem rigorosa do carimbo de data/hora de confirmação por chave primária para processamento downstream.

Esse processo será repetido até o pipeline terminar e todos os registros de alteração de dados forem processados (ou ele será repetido indefinidamente se nenhum horário de término for especificado).

Observe que esse exemplo de código usa estados e timers, em vez de janelas, para ordenar por chave. Não há garantia de que as janelas serão processadas em ordem. Isso significa que janelas mais antigas podem ser processadas depois que as mais recentes, o que pode resultar em processamento fora de ordem.

BreakRecordByModFn

Cada registro de mudança de dados pode conter várias modificações. Cada modificação representa uma inserção, atualização ou exclusão de um único valor de chave primária. Essa função divide cada registro de mudança de dados em registros separados, um por modificação.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Essa função recebe um DataChangeRecord e gera um DataChangeRecord codificado pela chave primária do Spanner com hash em um valor inteiro.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Temporizadores e buffers são definidos por chave. Essa função armazena em buffer cada registro de alteração de dados até que a marca d'água passe o carimbo de data/hora em que queremos enviar os registros de mudança de dados armazenados em buffer.

Este código utiliza um temporizador de repetição para determinar quando limpar o buffer:

  1. Quando ele vê um registro de alteração de dados de uma chave pela primeira vez, ele define o timer para disparar no carimbo de data/hora de confirmação do registro de alteração de dados + incrementIntervalSeconds (uma opção configurável pelo usuário).
  2. Quando o timer é disparado, ele adiciona a recordsToOutput todos os registros de mudança de dados com carimbo de data/hora menor que o prazo de validade do timer. Se o buffer tiver registros de mudança de dados cujo carimbo de data/hora seja maior ou igual ao tempo de expiração do timer, ele adicionará esses registros de mudança de dados de volta ao buffer em vez de enviá-los. Em seguida, define o próximo timer para o prazo de validade atual, mais incrementIntervalInSeconds.
  3. Se recordsToOutput não estiver vazio, a função ordena os registros de mudança de dados em recordsToOutput por carimbo de data/hora de confirmação e ID da transação e, em seguida, os gera.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Pedidos de transações

Esse pipeline pode ser alterado para ordenar por ID da transação e carimbo de data/hora de confirmação. Para isso, armazene registros em buffer para cada par de ID da transação/carimbo de data/hora de confirmação, em vez de para cada chave do Spanner. Isso requer a modificação do código em KeyByIdFn.

Amostra: transações de montagem

Esse exemplo de código lê registros de alteração de dados, reúne todos os registros de alteração de dados pertencentes à mesma transação em um único elemento e gera esse elemento. As transações geradas por esse código de amostra não são ordenadas pelo carimbo de data/hora de confirmação.

Este exemplo de código usa buffers para reunir transações a partir de registros de alteração de dados. Ao receber um registro de alteração de dados pertencente a uma transação pela primeira vez, ele lê o campo numberOfRecordsInTransaction no registro de alteração de dados, que descreve o número esperado de registros de alteração de dados pertencentes a essa transação. Ele armazena em buffer os registros de mudança de dados pertencentes a essa transação até que o número de registros em buffer corresponda a numberOfRecordsInTransaction, em que gera os registros de mudança de dados agrupados.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Essa função recebe um DataChangeRecord e gera um DataChangeRecord codificado pelo ID da transação.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

Os buffers de TransactionBoundaryFn receberam pares de chave-valor de {TransactionId, DataChangeRecord} de KeyByTransactionIdFn e os armazenam em buffer em grupos baseados em TransactionId. Quando o número de registros armazenados em buffer é igual ao número de registros contidos em toda a transação, essa função classifica os objetos DataChangeRecord no grupo por sequência de registro e gera um par de chave-valor de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Aqui, presumimos que SortKey é uma classe definida pelo usuário que representa um par de {CommitTimestamp, TransactionId}. Confira um exemplo de implementação para SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Amostra: filtrar por tag de transação

Quando uma transação que modifica os dados do usuário é marcada, a tag correspondente e o tipo dela são armazenados como parte do DataChangeRecord. Estes exemplos demonstram como filtrar os registros de fluxo de alterações com base em tags de transação definidas pelo usuário e tags do sistema:

Filtragem de tags definida pelo usuário para my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Auditoria de filtragem de tags do sistema/TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Amostra: buscar a linha completa

Este exemplo funciona com uma tabela do Spanner chamada Singer que tem a seguinte definição:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

No modo de captura de valor OLD_AND_NEW_VALUES padrão dos fluxo de alterações, quando há uma atualização em uma linha do Spanner, o registro de alterações de dados recebido conterá apenas as colunas que foram alteradas. As colunas rastreadas, mas inalteradas, não serão incluídas no registro. A chave primária da modificação pode ser usada para fazer uma leitura de snapshot do Spanner no carimbo de data/hora de confirmação do registro de mudança de dados para buscar as colunas inalteradas ou até mesmo recuperar a linha completa.

Observe que a política de retenção do banco de dados pode precisar ser alterada para um valor maior ou igual à política de retenção do fluxo de alterações para que a leitura do snapshot seja bem-sucedida.

Observe também que o uso do tipo de captura de valor NEW_ROW é a maneira recomendada e mais eficiente de fazer isso, já que ele retorna todas as colunas rastreadas da linha por padrão e não exige uma leitura extra de snapshot no Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Essa transformação vai executar uma leitura desatualizada no carimbo de data/hora de confirmação de cada registro recebido e mapear a linha completa para JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Esse código cria um cliente de banco de dados do Spanner para realizar a busca de linhas completas e configura o pool de sessões para ter apenas algumas sessões, realizando leituras em uma instância do ToFullRowJsonFn sequencialmente. O Dataflow vai gerar muitas instâncias dessa função, cada uma com o próprio pool de clientes.

Amostra: Spanner para o Pub/Sub

Nesse cenário, o autor da chamada transmite registros para o Pub/Sub o mais rápido possível, sem qualquer agrupamento ou agregação. Isso é uma boa opção para acionar o processamento downstream, como fazer streaming de todas as novas linhas inseridas em uma tabela do Spanner para o Pub/Sub para processamento adicional.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Observe que o coletor do Pub/Sub pode ser configurado para garantir semântica apenas uma vez.

Amostra: Spanner para o Cloud Storage

Nesse cenário, o autor da chamada agrupa todos os registros em uma determinada janela e salva o grupo em arquivos separados do Cloud Storage. É uma boa opção para análise e arquivamento pontual, que são independentes do período de armazenamento do Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Observe que o coletor do Cloud Storage fornece semântica pelo menos uma vez por padrão. Com processamento extra, ela pode ser modificada para ter semântica exatamente uma vez.

Também fornecemos um modelo do Dataflow para esse caso de uso: consulte Conectar fluxo de alterações ao Cloud Storage.

Amostra: Spanner para o BigQuery (tabela de razão)

Aqui, o autor da chamada transmite registros alterados para o BigQuery. Cada registro de alteração de dados é refletido como uma linha no BigQuery. Essa é uma boa opção para análises. Esse código usa as funções definidas anteriormente, na seção Buscar linha completa, para recuperar a linha completa do registro e gravá-la no BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Observe que o coletor do BigQuery fornece semântica pelo menos uma vez por padrão. Com processamento extra, ela pode ser modificada para ter semântica exatamente uma vez.

Também fornecemos um modelo do Dataflow para esse caso de uso. Consulte Conectar fluxo de alterações ao BigQuery.

Monitorar um pipeline

Há duas classes de métricas disponíveis para monitorar um pipeline do Dataflow de fluxo de alterações.

Métricas padrão do Dataflow

O Dataflow fornece várias métricas para garantir a integridade do job, como atualização de dados, atraso do sistema, capacidade do job, uso da CPU do worker e muito mais. Para mais informações, consulte Como usar o Monitoring para pipelines do Dataflow.

Para pipelines de fluxo de alterações, há duas métricas principais que precisam ser consideradas: a latência do sistema e a atualização de dados.

A latência do sistema informará a duração máxima atual (em segundos) pelo qual um item de dados é processado ou aguardando processamento.

A atualização de dados mostrará o tempo entre agora (tempo real) e a marca-d'água de saída. A marca-d'água de saída do tempo T indica que todos os elementos com um horário de evento (estritamente) antes de T foram processados para computação. Em outras palavras, a atualização de dados mede o quanto o pipeline está atualizado em relação ao processamento dos eventos recebidos.

Se o pipeline estiver com poucos recursos, será possível notar esse efeito nessas duas métricas. A latência do sistema vai aumentar, porque os itens precisam esperar mais tempo antes de serem processados. A atualização de dados também vai aumentar, porque o pipeline não será capaz de acompanhar a quantidade de dados recebidos.

Métricas personalizadas do fluxo de alterações

Essas métricas são expostas no Cloud Monitoring e incluem:

  • Latência em classes (histograma) entre um registro que está sendo confirmado no Spanner para ele ser emitido em uma PCollection pelo conector. Essa métrica pode ser usada para ver problemas de desempenho (latência) do pipeline.
  • Número total de registros de dados lidos. Essa é uma indicação geral do número de registros emitidos pelo conector. Esse número precisa aumentar constantemente, espelhando a tendência das gravações no banco de dados do Spanner.
  • Número de partições que estão sendo lidas. É preciso sempre haver partições sendo lidas. Se esse número for zero, significa que ocorreu um erro no pipeline.
  • Número total de consultas emitidas durante a execução do conector. Essa é uma indicação geral das consultas de fluxo de alterações feitas à instância do Spanner durante toda a execução do pipeline. Ele pode ser usado para estimar a carga do conector para o banco de dados do Spanner.

Atualizar um pipeline

É possível atualizar um pipeline em execução que usa o conector do SpannerIO para processar fluxo de alterações se as verificações de compatibilidade do job forem aprovadas. Para isso, é necessário definir explicitamente o parâmetro do nome da tabela de metadados do novo job ao atualizá-lo. Use o valor da opção de pipeline metadataTable do job que você está atualizando.

Se você estiver usando um modelo do Dataflow fornecido pelo Google, defina o nome da tabela usando o parâmetro spannerMetadataTableName. Também é possível modificar o job atual para usar explicitamente a tabela de metadados com o método withMetadataTable(your-metadata-table-name) na configuração do conector. Depois disso, siga as instruções em Como iniciar seu job substituto nos documentos do Dataflow para atualizar um job em execução.

Práticas recomendadas para fluxo de alterações e o Dataflow

Veja a seguir algumas práticas recomendadas para criar conexões de fluxo de alterações usando o Dataflow.

Usar um banco de dados de metadados separado

Recomendamos que você crie um banco de dados separado para o conector do SpannerIO usar para armazenamento de metadados, em vez de configurá-lo para usar o banco de dados do aplicativo.

Para mais informações, consulte Considerar um banco de dados de metadados separado.

Dimensionar o cluster

Uma regra geral para um número inicial de workers em um job de fluxo de alterações do Spanner é um worker a cada 1.000 gravações por segundo. Essa estimativa pode variar dependendo de vários fatores, como o tamanho de cada transação, quantos registros de fluxo de alterações são produzidos a partir de uma única transação e outras transformações, agregações ou coletores que estão sendo usados no pipeline.

Após o recurso inicial, é importante acompanhar as métricas mencionadas em Monitorar um pipeline para garantir que ele esteja íntegro. Recomendamos testar um tamanho inicial do pool de workers e monitorar como o pipeline lida com a carga, aumentando o número de nós, se necessário. A utilização da CPU é uma métrica essencial para verificar se a carga está adequada e se são necessários mais nós.

Limitações conhecidas

Escalonamento automático

O suporte de escalonamento automático para qualquer pipeline que inclua SpannerIO.readChangeStream requer o Apache Beam 2.39.0 ou superior.

Se você usar uma versão do Apache Beam anterior a 2.39.0, os pipelines que incluem SpannerIO.readChangeStream precisarão especificar explicitamente o algoritmo de escalonamento automático como NONE, conforme descrito em Escalonamento automático horizontal.

Para escalonar manualmente um pipeline do Dataflow em vez de usar o escalonamento automático, consulte Como escalonar manualmente um pipeline de streaming.

Corredor V2

O conector de fluxo de alterações do Spanner requer o Dataflow Runner V2. É necessário especificar manualmente durante a execução. Caso contrário, um erro será gerado. É possível especificar Runner V2 configurando seu job com --experiments=use_unified_worker,use_runner_v2.

Snapshot

O conector de fluxo de alterações do Spanner não é compatível com snapshots do Dataflow.

Reduzindo

O conector de fluxo de alterações do Spanner não é compatível com a drenagem de um job. Só é possível cancelar um job que já existe.

Também é possível atualizar um pipeline atual sem precisar interrompê-lo.

OpenCensus

Para usar o OpenCensus para monitorar o pipeline, especifique a versão 0.28.3 ou posterior.

NullPointerException no início do pipeline

Um bug na versão 2.38.0 do Apache Beam pode causar uma NullPointerException ao iniciar o pipeline em determinadas condições. Isso impede que o job seja iniciado e mostra esta mensagem de erro:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Para resolver esse problema, use a versão 2.39.0 ou posterior do Apache Beam ou especifique manualmente a versão de beam-sdks-java-core como 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Mais informações