Criar conexões de fluxo de alterações usando o Dataflow

Esta página demonstra como criar pipelines do Dataflow que consomem e encaminham dados de alteração do Spanner usando fluxos de alterações. Você pode usar o exemplo de código nesta página para criar pipelines personalizados.

Principais conceitos

Confira a seguir alguns dos principais conceitos dos pipelines do Dataflow para fluxo de alterações.

Dataflow

Dataflow é um serviço sem servidor, rápido e econômico que oferece suporte a processamento em lote. Ele oferece portabilidade no processamento de jobs escritos usando o código aberto Apache Beam (em inglês) e automatiza o provisionamento de infraestrutura e o gerenciamento de cluster. O Dataflow oferece streaming quase em tempo real durante a leitura de fluxo de alterações.

O Dataflow pode ser usado para consumir mudanças no Spanner streams com o SpannerIO conector, que oferece uma abstração na API Spanner para consultar fluxo de alterações. Com esse conector, você não precisa gerenciar o ciclo de vida da partição de fluxos de mudança, o que é necessário quando você usa a API Spanner diretamente. O conector fornece com um fluxo de registros de alteração de dados. Assim, você fica livre para se concentrar mais na lógica do aplicativo e menos nos detalhes específicos da API e o particionamento de fluxo de alterações. Recomendamos usar o conector SpannerIO em vez que a API Spanner na maioria das circunstâncias em que é preciso ler dados do fluxo de alterações.

Os modelos do Dataflow são pipelines pré-criados do Dataflow que implementam casos de uso comuns. Consulte Modelos do Dataflow para uma visão geral.

Pipeline do Dataflow

Um pipeline do Dataflow de fluxo de alterações do Spanner é composto de quatro partes principais:

  1. Um banco de dados do Spanner com um fluxo de alterações
  2. O conector SpannerIO
  3. Transformações e coletores definidos pelo usuário
  4. um gravador de E/S do coletor do Apache Beam;

imagem

Fluxo de alterações do Spanner

Para saber como criar um fluxo de alterações, consulte Criar um fluxo de alterações.

Conector SpannerIO do Apache Beam

Esse é o conector SpannerIO descrito na seção anterior do Dataflow. Ele é um conector de E/S de origem que emite uma PCollection de registros de alteração de dados. para etapas posteriores do pipeline. O horário do evento para cada registro de alteração de dados emitido será o carimbo de data/hora de confirmação. Observe que os registros emitidos são unordered, e que o conector SpannerIO garante que não haverá atrasar registros.

Ao trabalhar com fluxos de alterações, o Dataflow usa o checkpointing. Como resultado, cada worker pode aguardar até o intervalo de verificação configurado para armazenar as alterações em buffer antes de enviá-las para processamento adicional.

Transformações definidas pelo usuário

Com uma transformação definida pelo usuário, ele pode agregar, transformar ou modificar e processar dados em um pipeline do Dataflow. Uso comum casos para isso são a remoção de informações de identificação pessoal, atendendo aos requisitos de formato de dados downstream e classificação. Consulte a documentação oficial do Apache Beam com o guia de programação transformações.

Gravador de E/S do sink do Apache Beam

O Apache Beam contém conectores de E/S integrados que pode ser usado para gravar em um pipeline do Dataflow como o BigQuery. Os coletores de dados mais comuns têm suporte nativo.

Modelos do Dataflow

Os modelos do Dataflow fornecem um método para criar jobs do Dataflow com base em imagens do Docker pré-criadas para casos de uso comuns usando o console do Google Cloud, a CLI do Google Cloud ou as chamadas da API REST.

Para os fluxo de alterações do Spanner, oferecemos três Modelos Flex do Dataflow:

Definir permissões de IAM para modelos do Dataflow

Antes de criar um job do Dataflow com os três modelos Flex listados, verifique se você tem as permissões do IAM necessárias para as seguintes contas de serviço:

Se você não tiver as permissões necessárias do IAM, será necessário especificar uma conta de serviço de worker gerenciada pelo usuário para criar o job do Dataflow. Para mais informações, consulte Segurança e permissões do Dataflow.

Quando você tenta executar um job de um modelo Flex do Dataflow sem todas as permissões necessárias, o job pode falhar falha ao ler o erro do arquivo de resultado ou um permissão negada em erro de recurso. Para mais informações, consulte Resolver problemas de modelos Flex.

criar um pipeline do Dataflow

Esta seção aborda a configuração inicial do conector e fornece amostras de integrações comuns com os fluxo de alterações do Spanner .

Para seguir estas etapas, você precisa de um ambiente de desenvolvimento Java para o Dataflow. Para mais informações, consulte Criar um pipeline do Dataflow usando Java.

Criar um stream de alterações

Para saber como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar as próximas etapas, você precisa ter um banco de dados do Spanner com um fluxo de alterações configurado.

Conceder privilégios de controle de acesso granular

Se você espera que usuários de controle de acesso detalhado executem o job do Dataflow, verifique se os usuários têm acesso a uma função de banco de dados com o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função com valor de tabela do fluxo de alterações. Certifique-se também de que principal especifica o papel do banco de dados na configuração SpannerIO ou o modelo Flex do Dataflow.

Para mais informações, consulte Sobre o controle de acesso detalhado.

Adicionar o conector SpannerIO como uma dependência

O conector SpannerIO do Apache Beam encapsula a complexidade do consumo dos fluxos de alterações diretamente usando a API Cloud Spanner, emitindo uma PCollection de registros de dados de fluxo de alterações para estágios posteriores do pipeline.

Esses objetos podem ser consumidos em outros estágios do ciclo de vida pipeline do Dataflow. A integração do fluxo de alterações faz parte conector SpannerIO. Para usar o conector SpannerIO, a dependência precisa ser adicionada ao arquivo pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Criar um banco de dados de metadados

O conector precisa acompanhar cada partição ao executar o pipeline do Apache Beam. Ele mantém esses metadados em uma tabela Spanner criada pelo conector durante a inicialização. Você especifica no qual a tabela será criada durante a configuração do conector.

Conforme descrito em práticas recomendadas de fluxos de alterações, recomendamos a criação de um novo banco de dados para essa finalidade, em vez de permitir conector para usar o banco de dados do aplicativo para armazenar a tabela de metadados dele.

O proprietário de um job do Dataflow que usa o conector SpannerIO precisa ter as seguintes permissões do IAM definidas com este banco de dados de metadados:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurar o conector

O conector de fluxo de alterações do Spanner pode ser configurado da seguinte maneira:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Confira a seguir descrições das opções de readChangeStream():

Configuração do Spanner (obrigatória)

Usado para configurar o projeto, a instância e o banco de dados em que o fluxo de alterações foi criado e deve ser consultado. Também especifica, opcionalmente, o papel do banco de dados a ser usado quando o principal do IAM que está executando o job do Dataflow é um usuário de controle de acesso granular. O job assume essa função de banco de dados para acessar o fluxo de alterações. Para mais informações, consulte Sobre o controle de acesso detalhado.

Nome do fluxo de alterações (obrigatório)

Esse nome identifica exclusivamente o fluxo de mudanças. Esse nome precisa ser o mesmo usado na criação.

ID da instância de metadados (opcional)

Essa é a instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.

ID do banco de dados de metadados (obrigatório)

É o banco de dados para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.

Nome da tabela de metadados (opcional)

Use isso somente ao atualizar um pipeline atual.

Este é o nome de tabela de metadados preexistente a ser usado pelo conector. Ela é usada pelo conector para armazenar os metadados para controlar o consumo dos dados da API do fluxo de alterações. Se essa opção for omitido, o Spanner cria uma nova tabela com um valor na inicialização do conector.

Prioridade da RPC (opcional)

A solicitação prioridade seja usada para as consultas de fluxo de alterações. Se esse parâmetro for omitido, high priority será usado.

InclusiveStartAt (obrigatório)

As mudanças do carimbo de data/hora fornecido são retornadas ao autor da chamada.

InclusiveEndAt (opcional)

As mudanças até o carimbo de data/hora fornecido são retornadas ao autor da chamada. Se esse parâmetro for omitido, as mudanças serão emitidas indefinidamente.

Adicionar transformações e coletores para processar dados alterados

Com as etapas anteriores concluídas, o conector SpannerIO configurado está pronto para emitir uma PCollection de objetos DataChangeRecord. Consulte Exemplos de transformações e coletores para ver diversos exemplos. configurações de pipeline que processam esses dados transmitidos de várias maneiras.

Os registros de fluxo de alterações emitidos pelo conector SpannerIO não são ordenados. Isso ocorre porque as PCollections não oferecem garantias de ordenação. Se você precisar de um fluxo ordenado, agrupe e classifique os registros como transformações nos pipelines. Consulte Exemplo: ordenar por chave. É possível estender esta amostra para classificar os registros com base em qualquer campo dos registros, como por IDs de transação.

Exemplos de transformações e coletores

Você pode definir suas próprias transformações e especificar coletores para gravar os dados. A documentação do Apache Beam oferece uma infinidade de transformações que podem ser aplicadas, além de conectores de E/S prontos para uso para gravar os dados em sistemas externos.

Amostra: ordenar por chave

Este exemplo de código emite registros de alteração de dados ordenados por carimbo de data/hora de confirmação e agrupados por chaves primárias usando o conector do Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Este exemplo de código usa estados e timers para armazenar registros de cada chave e define o tempo de expiração do timer para um tempo T configurado pelo usuário no futuro (definido na função BufferKeyUntilOutputTimestamp). Quando a marca d'água do Dataflow passa do tempo T, esse código limpa todos os registros no buffer com carimbo de data/hora menor que T, ordena esses registros por carimbo de data/hora de confirmação e gera um par de chave-valor em que:

  • A chave é a chave de entrada, que é a chave primária hashizada para uma matriz de bucket de tamanho 1.000.
  • O valor são os registros de alteração de dados ordenados que foram armazenados em buffer para a chave.

Temos as seguintes garantias para cada chave:

  • Os temporizadores são acionados na ordem do carimbo de data/hora de expiração.
  • Os estágios downstream têm garantia de receber os elementos na mesma ordem em que foram produzidos.

Por exemplo, com uma chave do valor 100, o timer dispara em T1 e T10, respectivamente, produzindo um pacote de registros de alteração de dados em cada carimbo de data/hora. Como os registros de alteração de dados gerados em T1 foram produzidos antes dos registros de alteração de dados gerados em T10, os registros de alteração de dados gerados em T1 também têm a garantia de serem recebidos no próximo estágio, antes dos registros de alteração de dados gerados em T10. Esse mecanismo nos ajuda a garantir a ordem estrita do carimbo de data/hora de confirmação por chave primária para o processamento downstream.

Esse processo será repetido até que o pipeline termine e todos os registros de alteração de dados tenham sido processados. Se não for especificado um horário de término, ele será repetido indefinidamente.

Observe que este exemplo de código usa estados e timers, em vez de janelas, para realizar a ordenação por chave. O motivo é que não há garantia de que as janelas serão processadas na ordem certa. Isso significa que as janelas mais antigas podem ser processadas mais tarde do que as mais recentes, o que pode resultar em um processamento fora de ordem.

BreakRecordByModFn

Cada registro de mudança de dados pode conter várias modificações. Cada modificação representa uma inserção, atualização ou exclusão de um único valor de chave primária. Essa função divide cada registro de mudança de dados em registros separados, um por mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Essa função recebe um DataChangeRecord e gera um DataChangeRecord codificado pela chave primária do Spanner com hash para um valor inteiro.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timers e buffers são por chave. Essa função armazena em buffer cada registro de alteração de dados até que a marca d'água passe do carimbo de data/hora em que queremos gerar os registros de alteração de dados em buffer.

Este código usa um timer em loop para determinar quando limpar o buffer:

  1. Quando um registro de alteração de dados de uma chave é encontrado pela primeira vez, o timer é acionado no carimbo de data/hora de confirmação do registro de alteração de dados + incrementIntervalSeconds (uma opção configurável pelo usuário).
  2. Quando o timer é acionado, ele adiciona todos os registros de alteração de dados no buffer com um carimbo de data/hora menor que o tempo de expiração do timer para recordsToOutput. Se o buffer tiver registros de alteração de dados com carimbo de data e hora maior ou igual ao tempo de expiração do timer, ele adicionará esses registros de alteração de dados de volta ao buffer em vez de enviá-los. Em seguida, ele define o próximo timer para o tempo de expiração do timer atual mais incrementIntervalInSeconds.
  3. Se recordsToOutput não estiver vazio, a função vai ordenar os registros de mudança de dados em recordsToOutput por carimbo de data/hora de confirmação e ID da transação e, em seguida, vai gerar a saída.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Pedidos de transações

Esse pipeline pode ser alterado para ordenar por ID da transação e carimbo de data/hora de confirmação. Para fazer isso, registre os registros de buffer para cada par de ID de transação / carimbo de data/hora de confirmação, em vez de cada chave do Spanner. Isso requer a modificação do código em KeyByIdFn.

Amostra: transações de montagem

Este exemplo de código lê registros de alteração de dados, reúne todos os registros de alteração de dados pertencentes à mesma transação em um único elemento e gera esse elemento. As transações geradas por este código de exemplo não são ordenadas por carimbo de data/hora de confirmação.

Este exemplo de código usa buffers para montar transações de registros de alteração de dados. Ao receber um registro de alteração de dados pertencente a uma transação pela primeira vez, ele lê o campo numberOfRecordsInTransaction no registro de alteração de dados, que descreve o número esperado de registros de alteração de dados pertencentes a essa transação. Ele armazena em buffer os registros de mudança de dados pertencentes a essa transação até que o número de registros em buffer corresponda a numberOfRecordsInTransaction, quando ele vai gerar os registros de mudança de dados agrupados.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Essa função recebe um DataChangeRecord e gera um DataChangeRecord com chave pelo ID da transação.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn armazena em buffer os pares de chave-valor de {TransactionId, DataChangeRecord} recebidos de KeyByTransactionIdFn e os armazena em grupos com base em TransactionId. Quando o número de registros armazenados em buffer é igual ao número de registros contidos no transação inteira, essa função classifica os objetos DataChangeRecord no grupo por sequência de registro e gera um par de chave-valor de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Aqui, estamos assumindo que SortKey é uma classe definida pelo usuário que representa um par de {CommitTimestamp, TransactionId}. Para mais informações sobre o SortKey, consulte o exemplo de implementação.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Exemplo: filtrar por tag de transação

Quando uma transação que modifica os dados do usuário é marcada, a tag correspondente e o tipo dela são armazenados como parte de DataChangeRecord. Estes exemplos demonstram como filtrar registros de fluxo de mudanças com base em tags de transação definidas pelo usuário e tags do sistema:

Filtragem de tags definidas pelo usuário para my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtragem de tags do sistema/auditoria de TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Exemplo: buscar linha completa

Este exemplo funciona com uma tabela do Spanner chamada Singer que tem a seguinte definição:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

No modo de captura de valor OLD_AND_NEW_VALUES padrão dos fluxos de mudança, quando há uma atualização em uma linha do Spanner, o registro de mudança de dados recebido vai conter apenas as colunas que foram alteradas. Rastreado, mas as colunas inalteradas não serão incluídas no registro. A chave primária do A modificação pode ser usada para fazer um snapshot do Spanner lido na confirmação carimbo de data/hora do registro de alteração de dados para buscar as colunas inalteradas ou mesmo recuperar a linha completa.

A política de retenção do banco de dados pode precisar ser alterada para um valor maior ou igual à política de retenção do fluxo de alterações para que o snapshot leitura para ter sucesso.

O uso do tipo de captura de valor NEW_ROW é a abordagem maneira mais eficiente de fazer isso, já que retorna todas as colunas rastreadas da linha por padrão e não exige a leitura de um snapshot extra no Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Essa transformação vai executar uma leitura desatualizada no carimbo de data/hora de confirmação de cada recebido e mapear a linha completa em JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Este código cria um cliente de banco de dados do Spanner para executar busca de linha e configura o pool de sessões para ter apenas algumas sessões, realizando leituras em uma instância do ToFullReowJsonFn sequencialmente. O Dataflow garante a geração de muitas instâncias dessa função, cada um com o próprio pool de clientes.

Amostra: do Spanner para o Pub/Sub

Nesse cenário, o autor da chamada transmite os registros para o Pub/Sub o mais rápido possível, sem agrupamento nem agregação. Esta é uma boa adequada para acionar o processamento downstream, como transmitir todas as novas linhas inseridos em uma tabela do Spanner para Pub/Sub para mais processamento.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

O coletor do Pub/Sub pode ser configurado para garantir a semântica exatamente uma vez.

Amostra: Spanner para Cloud Storage

Nesse cenário, o autor da chamada agrupa todos os registros em uma determinada janela e salva o grupo em arquivos separados do Cloud Storage. Essa é uma boa opção para análises e arquivamento pontual, que é independente do período de armazenamento do Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

O coletor do Cloud Storage fornece semântica pelo menos uma vez por padrão. Com processamento extra, ele pode ser modificado para ter semântica exatamente uma vez.

Também oferecemos um modelo do Dataflow para esse caso de uso: consulte Conectar fluxos de alterações ao Cloud Storage.

Exemplo: Spanner para BigQuery (tabela de registro)

Aqui, o autor da chamada transmite registros de mudança para o BigQuery. Cada registro de alteração de dados é refletido como uma linha no BigQuery. Essa é uma boa opção para análises. Esse código usa as funções definidas anteriormente, na seção Buscar linha completa, para recuperar a linha completa do registro e gravá-la no BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

O coletor do BigQuery fornece semântica pelo menos uma vez por padrão. Com processamento extra, ele pode ser modificado para ter semântica exatamente uma vez.

Também oferecemos um modelo do Dataflow para esse caso de uso. Consulte Conectar fluxos de alterações ao BigQuery.

Monitorar um pipeline

Há duas classes de métricas disponíveis para monitorar um pipeline do Dataflow de fluxo de alterações.

Métricas padrão do Dataflow

O Dataflow fornece várias métricas para garantir a integridade do job, como atualização de dados, atraso do sistema, capacidade de processamento do job, uso de CPU do worker e muito mais. Você encontra mais informações em Como usar o Monitoring em pipelines do Dataflow.

Para pipelines de fluxo de alterações, há duas métricas principais que precisam ser consideradas: a latência do sistema e a atualização de dados.

A latência do sistema informa a duração máxima atual (em segundos) em que um item de dados é processado ou está aguardando processamento.

A atualização de dados mostra o tempo entre o momento atual (tempo real) e a marca d'água de saída. A marca-d'água de saída com o tempo T indica que todos os elementos com um horário de evento (estritamente) antes de T foram processados para cálculo. Em outras palavras, a atualização de dados mede o quanto o pipeline está atualizado em relação ao processamento dos eventos recebidos.

Se houver poucos recursos no pipeline, você verá esse efeito nas duas métricas. A latência do sistema vai aumentar porque os itens precisam esperar mais tempo antes de serem processados. A atualização dos dados também vai aumentar, porque o pipeline não vai conseguir acompanhar a quantidade de dados recebidos.

Métricas personalizadas do fluxo de alterações

Essas métricas são expostas no Cloud Monitoring e incluem:

  • Latência bucketizada (histograma) entre um registro sendo confirmado no Spanner e emitido em uma PCollection pelo conector. Essa métrica pode ser usada para conferir problemas de desempenho (latência) com o pipeline.
  • Número total de registros de dados lidos. É uma indicação geral do número de registros emitidos pelo conector. Esse número precisa estar sempre aumentando, espelhando a tendência de gravações no banco de dados subjacente do Spanner.
  • Número de partições que estão sendo lidas. É preciso que haja sempre a leitura de partições. Se esse número for zero, isso indica que ocorreu um erro no pipeline.
  • Número total de consultas emitidas durante a execução do conector. Essa é uma indicação geral das consultas de fluxo de mudanças feitas na instância do Spanner durante a execução do pipeline. Isso pode ser usado para estimar a carga do conector para o banco de dados do Spanner.

Atualizar um pipeline

É possível atualizar um pipeline em execução que usa SpannerIO para processar fluxo de alterações se a compatibilidade do job verificações sejam aprovados. Para fazer isso, é necessário definir explicitamente o parâmetro de nome da tabela de metadados do novo job ao atualizá-lo. Use o valor da opção de pipeline metadataTable do job que você está atualizando.

Se você estiver usando um modelo do Dataflow fornecido pelo Google, defina o nome da tabela usando o parâmetro spannerMetadataTableName. Também é possível modificar job atual para usar explicitamente a tabela de metadados com o método withMetadataTable(your-metadata-table-name) pol. a configuração do conector. Depois de fazer isso, você pode seguir o instruções em Como iniciar uma substituição job no Documentação do Dataflow para atualizar um job em execução.

Práticas recomendadas para fluxo de alterações e o Dataflow

Confira a seguir algumas práticas recomendadas para criar conexões de fluxos de alterações usando o Dataflow.

Usar um banco de dados de metadados separado

Recomendamos criar um banco de dados separado para o conector SpannerIO usar para armazenamento de metadados, em vez de configurá-lo para usar o banco de dados do aplicativo.

Para mais informações, consulte Considere usar um banco de dados de metadados separado.

Dimensione o cluster

Uma regra geral para um número inicial de workers em um job de fluxos de mudança do Spanner é um worker por 1.000 gravações por segundo. Essa estimativa pode variar dependendo de vários fatores, como o tamanho de cada transação, quantos registros de fluxo de mudanças são produzidos a partir de uma única transação e outras transformações, agregações ou coletores que estão sendo usados no pipeline.

Após a realocação inicial, é importante acompanhar as métricas mencionadas em Monitorar um pipeline, para garantir que o pipeline esteja funcionando corretamente. Recomendamos testar um modelo tamanho do pool de workers e monitorar o desempenho o pipeline lida com a carga, aumentando o número de nós, se necessário. A utilização da CPU é uma métrica importante para verificar se a carga é adequada e se mais nós são necessários.

Limitações conhecidas

Há algumas limitações conhecidas ao usar fluxo de alterações do Spanner com o Dataflow:

Escalonamento automático

Suporte ao escalonamento automático para qualquer pipeline que inclua SpannerIO.readChangeStream requer o Apache Beam 2.39.0 ou mais recente.

Se você usar uma versão do Apache Beam anterior a 2.39.0, os pipelines que incluem SpannerIO.readChangeStream precisa especificar explicitamente o escalonamento automático algoritmo como NONE, conforme descrito em Escalonamento automático horizontal.

Para escalonar manualmente um pipeline do Dataflow em vez de usar o escalonamento automático, consulte Como escalonar manualmente um pipeline de streaming.

Runner V2

O conector de fluxo de alterações do Spanner exige Executor do Dataflow V2 Precisa ser especificado manualmente durante a execução. Caso contrário, será gerado um erro. gerada. Você pode especificar Runner V2 configurando seu job com --experiments=use_unified_worker,use_runner_v2.

Snapshot

O conector de fluxos de alterações do Spanner não é compatível com snapshots do Dataflow.

Reduzindo

O conector de fluxos de mudanças do Spanner não oferece suporte a esvaziamento de um job. Só é possível cancelar um job existente.

Você também pode atualizar um pipeline atual sem precisar interrompê-lo.

OpenCensus

Para usar o OpenCensus para monitorar seu pipeline, especifique a versão 0.28.3 ou mais recente.

NullPointerException no início do pipeline

Um bug na versão 2.38.0 do Apache Beam pode causar uma NullPointerException ao iniciar o pipeline em determinadas condições. Isso impediria o início do job e mostraria esta mensagem de erro:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Para resolver esse problema, use a versão 2.39.0 ou mais recente do Apache Beam ou especifique manualmente a versão de beam-sdks-java-core como 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Mais informações