Criar conexões de fluxo de alterações usando o Dataflow

Nesta página, demonstramos como criar pipelines do Dataflow que consomem e encaminham dados alterados do Spanner usando fluxos de alterações. Use o código de exemplo desta página para criar pipelines personalizados.

Principais conceitos

Veja a seguir alguns dos principais conceitos dos pipelines do Dataflow para fluxo de alterações.

Dataflow

O Dataflow é um serviço sem servidor, rápido e econômico que oferece suporte ao processamento de stream e em lote. Ele fornece portabilidade com jobs de processamento gravados usando as bibliotecas de código aberto do Apache Beam (em inglês) e automatiza o provisionamento de infraestrutura e o gerenciamento de cluster. O Dataflow oferece streaming quase em tempo real durante a leitura de fluxo de alterações.

É possível usar o Dataflow para consumir os fluxos de alterações do Spanner com o conector SpannerIO (em inglês), que oferece uma abstração sobre a API Spanner para consultar os fluxo de alterações. Com esse conector, não é necessário gerenciar o ciclo de vida da partição dos fluxo de alterações, que é necessário ao usar a API Spanner diretamente. O conector fornece um fluxo de registros de alteração de dados para que você possa se concentrar mais na lógica do aplicativo e menos em detalhes específicos da API e no particionamento de fluxo de alterações dinâmico. Recomendamos usar o conector SpannerIO em vez da API Spanner na maioria das circunstâncias em que você precisa ler dados de fluxo de alterações.

Os modelos do Dataflow são pipelines pré-criados que implementam casos de uso comuns. Consulte os modelos do Dataflow para ter uma visão geral.

Pipeline do Dataflow

Um pipeline do Dataflow de fluxo de alterações do Spanner é composto de quatro partes principais:

  1. Um banco de dados do Spanner com um fluxo de alterações
  2. O conector SpannerIO
  3. Transformações e coletores definidos pelo usuário
  4. um gravador de E/S de coletor

imagem

Cada uma delas é discutida em mais detalhes abaixo.

Fluxo de alterações do Spanner

Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações.

Conector SpannerIO do Apache Beam

Esse é o conector SpannerIO descrito anteriormente. Ele é um conector de E/S de origem que emite um PCollection de registros de alteração de dados para os estágios posteriores do pipeline. O horário do evento para cada registro de alteração de dados emitido será o carimbo de data/hora de confirmação. Observe que os registros emitidos são não ordenados e que o conector do SpannerIO garante que não haverá registros atrasados.

Ao trabalhar com fluxo de alterações, o Dataflow usa checkpoint. Como resultado, cada worker pode aguardar até o intervalo de checkpoint configurado para armazenar em buffer as alterações antes de enviá-las para processamento adicional.

Transformações definidas pelo usuário

Uma transformação definida pelo usuário permite que um usuário agregue, transforme ou modifique dados de processamento em um pipeline do Dataflow. Casos de uso comuns para isso são a remoção de informações de identificação pessoal, o cumprimento dos requisitos de formato de dados downstream e a classificação. Consulte a documentação oficial do Apache Beam para ver o guia de programação sobre transforms.

Gravador de E/S do coletor do Apache Beam

O Apache Beam contém transformações integradas de E/S que podem ser usadas para gravar de um pipeline do Dataflow em um coletor de dados como o BigQuery. Os coletores de dados mais comuns têm suporte nativo.

Modelos do Dataflow

Os modelos do Dataflow fornecem uma maneira fácil de criar jobs do Dataflow com base em imagens do Docker pré-criadas para casos de uso comuns por meio do console do Google Cloud, da CLI do Google Cloud ou de chamadas da API REST.

Para os fluxo de alterações do Spanner, fornecemos três modelos flexíveis do Dataflow:

criar um pipeline do Dataflow

Nesta seção, abordamos a configuração inicial do conector e fornecemos exemplos de integrações comuns com a funcionalidade de fluxo de alterações do Spanner.

Para seguir estas etapas, você precisa de um ambiente de desenvolvimento Java para o Dataflow. Para mais informações, acesse Criar um pipeline do Dataflow usando Java.

Criar um stream de alterações

Para saber detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar com as próximas etapas, você precisa ter um banco de dados do Spanner com um fluxo de alterações configurado.

Conceder privilégios de controle de acesso refinados

Se você espera que qualquer usuário de controle de acesso refinado execute o job do Dataflow, garanta que eles tenham acesso a um papel de banco de dados que tenha o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função com valor de tabela do fluxo de alterações. Verifique também se o principal especifica o papel do banco de dados na configuração SpannerIO ou no modelo flexível do Dataflow.

Para mais informações, consulte Sobre o controle de acesso detalhado.

Adicionar o conector SpannerIO como uma dependência

O conector SpannerIO do Apache Beam encapsula a complexidade de consumir os fluxos de alterações diretamente pela API Cloud Spanner, emitindo uma PCollection de registros de dados do fluxo de alterações para os estágios posteriores do pipeline.

Esses objetos podem ser consumidos em outros estágios do pipeline do Dataflow do usuário. A integração do fluxo de alterações faz parte do conector SpannerIO. Para usar o conector SpannerIO, a dependência precisa ser adicionada ao arquivo pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Criar um banco de dados de metadados

O conector precisa rastrear cada partição ao executar o pipeline do Apache Beam. Ele mantém esses metadados em uma tabela do Spanner criada pelo conector durante a inicialização. Você especifica o banco de dados em que essa tabela será criada durante a configuração do conector.

Conforme descrito em Práticas recomendadas de streams de alterações, recomendamos criar um novo banco de dados para essa finalidade, em vez de permitir que o conector use o banco de dados do aplicativo para armazenar a tabela de metadados dele.

O proprietário de um job do Dataflow que usa o conector SpannerIO precisa ter as seguintes permissões do IAM definidas com esse banco de dados de metadados:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurar o conector

O conector de fluxo de alterações do Spanner pode ser configurado da seguinte maneira:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Confira a seguir descrições das opções de readChangeStream():

Configuração do Spanner (obrigatório)

Usado para configurar o projeto, a instância e o banco de dados em que o fluxo de alterações foi criado e deve ser consultado. Opcionalmente, também especifica o papel de banco de dados a ser usado quando o principal do IAM que está executando o job do Dataflow é um usuário de controle de acesso refinado. O job assume esse papel de banco de dados para acessar o fluxo de alterações. Para mais informações, consulte Sobre o controle de acesso detalhado.

Nome do fluxo de alterações (obrigatório)

Esse nome identifica exclusivamente o fluxo de alterações. Esse nome precisa ser o mesmo usado na criação.

ID da instância de metadados (opcional)

Essa instância armazena os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.

ID do banco de dados de metadados (obrigatório)

Esse é o banco de dados para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.

Nome da tabela de metadados (opcional)

Use isso somente ao atualizar um pipeline atual.

Esse é o nome da tabela de metadados atual a ser usado pelo conector. Ele é usado pelo conector para armazenar os metadados e controlar o consumo dos dados da API do fluxo de alterações. Se essa opção for omitida, o Spanner criará uma nova tabela com um nome gerado na inicialização do conector.

Prioridade de RPC (opcional)

A prioridade da solicitação a ser usada para as consultas de fluxo de alterações. Se esse parâmetro for omitido, high priority será usado.

InclusiveStartAt (obrigatório)

As alterações do carimbo de data/hora fornecido são retornadas ao autor da chamada.

InclusiveEndAt (opcional)

As alterações até o carimbo de data/hora fornecido são retornadas ao autor da chamada. Se esse parâmetro for omitido, as alterações serão emitidas indefinidamente.

Adicionar transformações e coletores para processar dados alterados

Com as etapas anteriores concluídas, o conector SpannerIO configurado está pronto para emitir uma PCollection de objetos DataChangeRecord. Consulte Exemplos de transformações e coletores para ver vários exemplos de configurações de pipeline que processam esses dados transmitidos de várias maneiras.

Observe que os registros de fluxo de alterações emitidos pelo conector SpannerIO não estão ordenados. Isso ocorre porque as PCollections não dão garantias de ordenação. Se você precisar de um stream ordenado, agrupe e classifique os registros como transformações nos pipelines. Consulte Exemplo: ordem por chave. É possível estender essa amostra para classificar os registros com base em qualquer campo dos registros, como por IDs de transação.

Exemplos de transformações e coletores

Você pode definir suas próprias transformações e especificar coletores para gravar os dados. A documentação do Apache Beam fornece uma série de transforms que podem ser aplicadas e prontas para usar conectores de E/S para gravar os dados em sistemas externos.

Amostra: ordenar por chave

Este exemplo de código emite registros de alteração de dados ordenados por carimbo de data/hora de confirmação e agrupados por chaves primárias usando o conector do Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Esse exemplo de código usa estados e timers para armazenar registros em buffer para cada chave e define o prazo de validade do timer como um horário T configurado pelo usuário no futuro (definido na função BufferKeyUntilOutputTimestamp). Quando a marca d'água do Dataflow passa T, esse código limpa todos os registros no buffer com carimbo de data/hora menor que T, ordena esses registros por carimbo de data/hora de confirmação e gera um par de chave-valor em que:

  • A chave é a chave de entrada, ou seja, a chave primária com hash para uma matriz de bucket de tamanho 1.000.
  • O valor são os registros de alteração de dados ordenados que foram armazenados em buffer para a chave.

Temos as seguintes garantias para cada chave:

  • Os temporizadores são acionados na ordem do carimbo de data/hora de expiração.
  • Os estágios downstream têm a garantia de receber os elementos na mesma ordem em que foram produzidos.

Por exemplo, digamos que, para uma chave com o valor 100, o timer seja disparado em T1 e T10, respectivamente, produzindo um pacote de registros de alteração de dados em cada carimbo de data/hora. Como os registros de alteração de dados gerados em T1 foram produzidos antes dos registros de alteração de dados gerados em T10, os registros de alteração de dados gerados em T1 também têm a garantia de serem recebidos no próximo estágio, antes dos registros de alteração de dados gerados em T10. Esse mecanismo nos ajuda a garantir a ordem estrita do carimbo de data/hora de confirmação por chave primária para o processamento downstream.

Esse processo será repetido até que o pipeline termine e todos os registros de alteração de dados tenham sido processados. Se não for especificado um horário de término, ele será repetido indefinidamente.

Observe que esse exemplo de código usa estados e timers, em vez de janelas, para executar a ordenação por chave. A lógica é que não há garantia de que as janelas serão processadas em ordem. Isso significa que as janelas mais antigas podem ser processadas mais tarde do que as mais recentes, o que pode resultar em um processamento fora de ordem.

BreakRecordByModFn

Cada registro de mudança de dados pode conter várias modificações. Cada modificação representa uma inserção, atualização ou exclusão de um único valor de chave primária. Essa função divide cada registro de alteração de dados em registros separados, um por modificação.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Essa função recebe um DataChangeRecord e gera um DataChangeRecord codificado pela chave primária do Spanner com hash para um valor inteiro.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timers e buffers são por chave. Essa função armazena em buffer cada registro de alteração de dados até que a marca d'água transmita o carimbo de data/hora em que queremos gerar os registros de alteração de dados em buffer.

Este código utiliza um temporizador de loop para determinar quando esvaziar o buffer:

  1. Ao detectar um registro de alteração de dados de uma chave pela primeira vez, ele define o timer para disparar no carimbo de data/hora de confirmação do registro de alteração de dados + incrementIntervalSeconds (uma opção configurável pelo usuário).
  2. Quando o timer é disparado, ele adiciona a recordsToOutput todos os registros de alteração de dados no buffer com carimbo de data/hora inferior ao prazo de validade do timer. Se o buffer tiver registros de alteração de dados cujo carimbo de data/hora seja maior ou igual ao tempo de expiração do timer, ele adicionará esses registros de alteração de dados de volta ao buffer em vez de enviá-los. Em seguida, ele define o próximo timer como o prazo de validade do timer atual mais incrementIntervalInSeconds.
  3. Se recordsToOutput não estiver vazio, a função ordenará os registros de alteração de dados em recordsToOutput por carimbo de data/hora de confirmação e ID da transação e, em seguida, os enviará.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Pedidos de transações

Esse pipeline pode ser alterado para ordenar por ID da transação e carimbo de data/hora de confirmação. Para isso, armazene registros em buffer para cada par de ID da transação/carimbo de data/hora de confirmação, em vez de para cada chave do Spanner. Isso requer uma modificação do código em KeyByIdFn.

Amostra: transações de montagem

Esse exemplo de código lê registros de alteração de dados, reúne todos os registros de alteração de dados pertencentes à mesma transação em um único elemento e gera esse elemento. Observe que as transações geradas por este exemplo de código não são ordenadas por carimbo de data/hora de confirmação.

Este exemplo de código usa buffers para montar transações de registros de alteração de dados. Ao receber um registro de alteração de dados pertencente a uma transação pela primeira vez, ele lê o campo numberOfRecordsInTransaction no registro de alteração de dados, que descreve o número esperado de registros de alteração de dados pertencentes a essa transação. Ele armazena em buffer os registros de mudança de dados pertencentes a essa transação até que o número de registros em buffer corresponda a numberOfRecordsInTransaction, o que gera os registros de alteração de dados agrupados.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Essa função recebe um DataChangeRecord e gera um DataChangeRecord codificado pelo ID da transação.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

Os buffers de TransactionBoundaryFn recebem pares de chave-valor de {TransactionId, DataChangeRecord} de KeyByTransactionIdFn e os armazena em buffer em grupos com base em TransactionId. Quando o número de registros armazenados em buffer é igual ao número de registros contidos em toda a transação, essa função classifica os objetos DataChangeRecord no grupo por sequência de registro e gera um par de chave-valor de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Aqui, estamos supondo que SortKey é uma classe definida pelo usuário que representa um par de {CommitTimestamp, TransactionId}. Consulte um exemplo de implementação de SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Amostra: filtrar por tag de transação

Quando uma transação que modifica dados do usuário é marcada, a tag correspondente e o tipo dela são armazenados como parte de DataChangeRecord. Estes exemplos demonstram como filtrar registros de fluxo de alterações com base em tags de transação definidas pelo usuário e tags do sistema:

Filtragem de tags definidas pelo usuário para my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtragem de tags do sistema/auditoria de TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Amostra: buscar a linha completa

Este exemplo funciona com uma tabela do Spanner chamada Singer que tem a seguinte definição:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

No modo de captura de valor padrão OLD_AND_NEW_VALUES dos fluxo de alterações, quando há uma atualização em uma linha do Spanner, o registro de alterações de dados recebido vai conter apenas as colunas que foram alteradas. As colunas rastreadas, mas inalteradas não serão incluídas no registro. A chave primária da modificação pode ser usada para fazer uma leitura de snapshot do Spanner no carimbo de data/hora de confirmação do registro de alteração de dados para buscar as colunas inalteradas ou até mesmo recuperar a linha completa.

Observe que pode ser necessário alterar a política de retenção do banco de dados para um valor maior ou igual à política de retenção do fluxo de alterações para que a leitura do snapshot seja bem-sucedida.

Observe também que o tipo de captura de valor NEW_ROW é a maneira recomendada e mais eficiente de fazer isso, porque ele retorna todas as colunas rastreadas da linha por padrão e não requer uma leitura de snapshot extra no Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Essa transformação vai executar uma leitura desatualizada no carimbo de data/hora de confirmação de cada registro recebido e mapear a linha completa para o JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Esse código cria um cliente de banco de dados do Spanner para realizar a busca de linha completa e configura o pool de sessões para ter apenas algumas sessões, realizando leituras em uma instância do ToFullRowJsonFn sequencialmente. O Dataflow gera muitas instâncias dessa função, cada uma com o próprio pool de clientes.

Amostra: do Spanner para o Pub/Sub

Nesse cenário, o autor da chamada transmite registros para o Pub/Sub o mais rápido possível, sem qualquer agrupamento ou agregação. Essa é uma boa opção para acionar o processamento downstream, como fazer streaming de todas as novas linhas inseridas em uma tabela do Spanner para o Pub/Sub para processamento adicional.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

O coletor do Pub/Sub pode ser configurado para garantir semântica apenas uma vez.

Amostra: Spanner para Cloud Storage

Nesse cenário, o autor da chamada agrupa todos os registros em uma determinada janela e salva o grupo em arquivos separados do Cloud Storage. Essa é uma boa opção para análises e arquivamento pontual, que é independente do período de armazenamento do Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

O coletor do Cloud Storage fornece semântica pelo menos uma vez por padrão. Com processamento extra, ele pode ser modificado para ter semântica exatamente uma vez.

Também fornecemos um modelo do Dataflow para esse caso de uso: consulte Conectar fluxo de alterações ao Cloud Storage.

Amostra: Spanner para BigQuery (tabela de razão)

Aqui, o autor da chamada transmite os registros de alteração para o BigQuery. Cada registro de alteração de dados é refletido como uma linha no BigQuery. Essa é uma boa opção para análises. Esse código usa as funções definidas anteriormente, na seção Buscar linha completa, para recuperar a linha completa do registro e gravá-la no BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

O coletor do BigQuery fornece semântica pelo menos uma vez por padrão. Com processamento extra, ele pode ser modificado para ter semântica exatamente uma vez.

Também fornecemos um modelo do Dataflow para esse caso de uso. Consulte Conectar fluxo de alterações ao BigQuery.

Monitorar um pipeline

Há duas classes de métricas disponíveis para monitorar um pipeline do Dataflow de fluxo de alterações.

Métricas padrão do Dataflow

O Dataflow fornece várias métricas para garantir a integridade do job, como atualização de dados, atraso do sistema, capacidade de processamento do job, uso de CPU do worker e muito mais. Você encontra mais informações em Como usar o Monitoring em pipelines do Dataflow.

Para pipelines de fluxo de alterações, há duas métricas principais que precisam ser consideradas: a latência do sistema e a atualização de dados.

A latência do sistema informará a duração máxima atual (em segundos) em que um item de dados é processado ou está aguardando o processamento.

A atualização de dados vai mostrar o período entre o momento atual (em tempo real) e a marca-d'água de saída. A marca-d'água de saída com o tempo T indica que todos os elementos com um horário de evento (estritamente) antes de T foram processados para cálculo. Em outras palavras, a atualização de dados mede o quanto o pipeline está atualizado em relação ao processamento dos eventos recebidos.

Se houver poucos recursos no pipeline, você verá esse efeito nas duas métricas. A latência do sistema vai aumentar porque os itens precisam esperar mais tempo antes de serem processados. A atualização dos dados também vai aumentar, porque o pipeline não vai conseguir acompanhar a quantidade de dados recebidos.

Métricas personalizadas do fluxo de alterações

Essas métricas são expostas no Cloud Monitoring e incluem:

  • Latência (histograma) em classes entre um registro confirmado no Spanner e a emissão em uma PCollection pelo conector. Essa métrica pode ser usada para verificar problemas de desempenho (latência) do pipeline.
  • Número total de registros de dados lidos. Esta é uma indicação geral do número de registros emitidos pelo conector. Esse número precisa estar sempre aumentando, espelhando a tendência de gravações no banco de dados subjacente do Spanner.
  • Número de partições que estão sendo lidas. É preciso que haja sempre a leitura de partições. Se esse número for zero, significa que ocorreu um erro no pipeline.
  • Número total de consultas emitidas durante a execução do conector. Essa é uma indicação geral das consultas de fluxo de alterações feitas na instância do Spanner durante toda a execução do pipeline. Isso pode ser usado para estimar a carga do conector para o banco de dados do Spanner.

Atualizar um pipeline

É possível atualizar um pipeline em execução que usa o conector SpannerIO para processar fluxo de alterações se as verificações de compatibilidade do job forem aprovadas. Para isso, defina explicitamente o parâmetro de nome da tabela de metadados do novo job ao atualizá-lo. Use o valor da opção de pipeline metadataTable do job que você está atualizando.

Se você estiver usando um modelo do Dataflow fornecido pelo Google, defina o nome da tabela usando o parâmetro spannerMetadataTableName. Também é possível modificar o job atual para usar explicitamente a tabela de metadados com o método withMetadataTable(your-metadata-table-name) na configuração do conector. Depois disso, siga as instruções em Como iniciar o job substituto nos documentos do Dataflow para atualizar um job em execução.

Práticas recomendadas para fluxo de alterações e o Dataflow

Veja a seguir algumas práticas recomendadas para criar conexões de fluxo de alterações usando o Dataflow.

Usar um banco de dados de metadados separado

Recomendamos criar um banco de dados separado para o conector do SpannerIO usar no armazenamento de metadados, em vez de configurá-lo para usar o banco de dados do aplicativo.

Para mais informações, consulte Considere usar um banco de dados de metadados separado.

Dimensione o cluster

Uma regra prática para um número inicial de workers em um job de fluxo de alterações do Spanner é de um worker a cada 1.000 gravações por segundo. Essa estimativa pode variar de acordo com vários fatores, como o tamanho de cada transação, quantos registros de fluxo de alterações são produzidos a partir de uma única transação e outras transformações, agregações ou coletores que estão sendo usados no pipeline.

Após a alocação de recursos, é importante acompanhar as métricas mencionadas em Monitorar um pipeline para garantir a integridade do pipeline. Recomendamos testar um tamanho inicial do pool de workers e monitorar como o pipeline lida com a carga, aumentando o número de nós, se necessário. A utilização da CPU é uma métrica importante para verificar se a carga está adequada e se mais nós são necessários.

Limitações conhecidas

Escalonamento automático

O suporte ao escalonamento automático para qualquer pipeline que inclua SpannerIO.readChangeStream requer o Apache Beam 2.39.0 ou superior.

Se você usar uma versão do Apache Beam anterior a 2.39.0, os pipelines que incluírem SpannerIO.readChangeStream precisarão especificar explicitamente o algoritmo de escalonamento automático como NONE, conforme descrito em Escalonamento automático horizontal.

Para escalonar manualmente um pipeline do Dataflow em vez de usar o escalonamento automático, consulte Escalonamento manual de um pipeline de streaming.

Executor V2

O conector de fluxo de alterações do Spanner requer o Dataflow Runner V2. Ele precisa ser especificado manualmente durante a execução. Caso contrário, um erro será gerado. Você pode especificar Runner V2 configurando seu job com --experiments=use_unified_worker,use_runner_v2.

Snapshot

O conector de fluxo de alterações do Spanner não é compatível com snapshots do Dataflow.

Reduzindo

O conector de fluxo de alterações do Spanner não é compatível com a redução de um job. Só é possível cancelar um job existente.

Também é possível atualizar um pipeline atual sem precisar interrompê-lo.

OpenCensus

Se quiser usar o OpenCensus para monitorar seu pipeline, especifique a versão 0.28.3 ou posterior.

NullPointerException no início do pipeline

Um bug na versão 2.38.0 do Apache Beam pode causar uma NullPointerException ao iniciar o pipeline em determinadas condições. Isso impede que o job seja iniciado e exibe esta mensagem de erro:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Para resolver esse problema, use a versão 2.39.0 ou posterior do Apache Beam ou especifique manualmente a versão de beam-sdks-java-core como 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Mais informações