Creare connessioni di modifiche in tempo reale utilizzando Dataflow

Questa pagina mostra come creare pipeline Dataflow che consumare e inoltrare i dati di modifica di Spanner utilizzando flussi di modifiche. Puoi utilizzare il codice di esempio in questa pagina per per creare pipeline personalizzate.

Concetti principali

Di seguito sono riportati alcuni concetti fondamentali delle pipeline Dataflow per le modifiche in tempo reale.

Dataflow

Dataflow è un servizio serverless, veloce e conveniente che supporta sia i flussi di dati nell'elaborazione batch. Offre la portabilità con job di elaborazione scritti utilizzando open source Apache Beam librerie e automatizza il provisioning dell'infrastruttura e la gestione dei cluster. Dataflow fornisce flussi di dati quasi in tempo reale durante la lettura dai modifiche in tempo reale.

Puoi usare Dataflow per applicare la modifica di Spanner i flussi di dati con SpannerIO , che offre un'astrazione sull'API Spanner per eseguire query sulle modifiche in tempo reale. Con questo connettore, non dovrai gestire le modifiche in tempo reale del ciclo di vita della partizione, necessaria quando utilizzi direttamente l'API Spanner. Il connettore ti fornisce con un flusso di record delle modifiche dei dati, così puoi concentrarti meglio sulla logica dell'applicazione e meno sui dettagli specifici delle API e delle il partizionamento delle modifiche in tempo reale. Consigliamo di utilizzare il connettore SpannerIO anziché dell'API Spanner nella maggior parte dei casi in cui è necessario i dati delle modifiche in tempo reale.

I modelli Dataflow sono pipeline Dataflow predefinite che implementano casi d'uso comuni. Consulta Modelli Dataflow per una panoramica.

Pipeline Dataflow

Una pipeline Dataflow in modifiche in tempo reale di Spanner è composta di quattro parti principali:

  1. Un database Spanner con un flusso di modifiche
  2. Il connettore SpannerIO
  3. Trasformazioni e sink definiti dall'utente
  4. Un writer I/O sink

immagine

Ciascuna di queste opzioni viene discussa più in dettaglio di seguito.

Modifiche in tempo reale Spanner

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche.

Connettore SpannerIO Apache Beam

Si tratta del connettore SpannerIO descritto in precedenza. È una connettore di I/O di origine che emette un PCollection di record delle modifiche dei dati nelle fasi successive della pipeline. La ora evento per ogni record delle modifiche dei dati emesse sarà il timestamp di commit. Nota che i record emessi sono non ordinato e che il connettore SpannerIO garantisca che non saranno record ritardati.

Quando si lavora con le modifiche in tempo reale, Dataflow utilizza il checkpoint. Di conseguenza, ogni worker potrebbe attendere fino all'intervallo di checkpoint configurato per il buffering delle modifiche prima di inviarle per un'ulteriore elaborazione.

Trasformazioni definite dall'utente

Una trasformazione definita dall'utente consente a un utente di aggregare, trasformare o modificare dei dati all'interno di una pipeline Dataflow. Uso comune i casi sono la rimozione di informazioni che consentono l'identificazione personale, soddisfacendo i requisiti di formato dei dati downstream e l'ordinamento. Consulta le documentazione ufficiale di Apache Beam per la guida alla programmazione su trasforma.

Writer I/O sink Apache Beam

Apache Beam contiene trasformazioni I/O integrate utilizzata per scrivere da una pipeline Dataflow in un data sink come BigQuery. I data sink più comuni sono supportati in modo nativo.

Modelli Dataflow

Dataflow modelli offrono una facile per creare job Dataflow basati su Docker predefiniti per casi d'uso comuni tramite la console Google Cloud, lo strumento Chiamate dell'interfaccia a riga di comando o API Rest.

Per le modifiche in tempo reale di Spanner, forniamo tre Modelli flessibili di Dataflow:

Crea una pipeline Dataflow

Questa sezione illustra la configurazione iniziale del connettore e fornisce esempi per integrazioni comuni con le modifiche in tempo reale di Spanner funzionalità.

Per seguire questi passaggi, è necessario un ambiente di sviluppo Java per Dataflow. Per ulteriori informazioni, vedi Crea una pipeline Dataflow utilizzando Java.

Crea un flusso di modifiche

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, devi avere un database Spanner con un flusso di modifiche configurato.

Concedere privilegi di controllo dell'accesso granulari

Se prevedi che il job Dataflow verrà eseguito da utenti di controllo dell'accesso granulare, assicura che agli utenti sia concesso l'accesso a un database ruolo con il privilegio SELECT per le modifiche in tempo reale e EXECUTE per la funzione con valori di tabella delle modifiche in tempo reale. Assicurati inoltre che specifica il ruolo del database nella configurazione SpannerIO o il modello flessibile di Dataflow.

Per ulteriori informazioni, consulta Informazioni sul controllo dell'accesso granulare.

Aggiungi il connettore SpannerIO come dipendenza

Il connettore SpannerIO di Apache Beam include la complessità dell'utilizzo dei flussi di modifiche direttamente tramite l'API Cloud Spanner, inviando una PCollection di record di dati relativi alle modifiche in tempo reale nelle fasi successive della pipeline.

Questi oggetti possono essere utilizzati in altre fasi della pipeline Dataflow dell'utente. L'integrazione delle modifiche in tempo reale fa parte del connettore SpannerIO. Per poter utilizzare il connettore SpannerIO, devi aggiungere la dipendenza al file pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

crea un database di metadati

Il connettore deve tenere traccia di ogni partizione quando esegue nella pipeline Apache Beam. Conserva questi metadati in uno Spanner creata dal connettore durante l'inizializzazione. Devi specificare in cui verrà creata la tabella durante la configurazione di rete.

Come descritto nella sezione Best practice per gli stream di modifiche, consigliamo di creare un nuovo database per questo scopo, anziché consentire per utilizzare il database della tua applicazione al fine di archiviare la relativa tabella dei metadati.

Il proprietario di un job Dataflow che utilizza SpannerIO deve disporre dei seguenti IAM autorizzazioni impostate con questo database di metadati:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configura il connettore

Il connettore di modifiche in tempo reale di Spanner può essere configurato come segue:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Di seguito sono riportate le descrizioni delle opzioni di readChangeStream():

Configurazione Spanner (obbligatoria)

Utilizzato per configurare il progetto, l'istanza e il database in cui è stato creato il flusso di modifiche e da cui deve essere eseguita la query. Inoltre, facoltativamente, specifica anche il ruolo del database da utilizzare quando IAM L'entità che esegue il job Dataflow è un utente con controllo dell'accesso granulare. Il job assume questo ruolo di database per l'accesso al flusso di modifiche. Per Per ulteriori informazioni, consulta Informazioni sul controllo dell'accesso granulare.

Nome flusso di modifiche (obbligatorio)

Questo nome identifica in modo univoco il flusso di modifiche. Il nome qui deve essere lo stesso utilizzato durante la creazione dell'elemento.

ID istanza metadati (facoltativo)

Questa è l'istanza in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.

ID database dei metadati (obbligatorio)

Si tratta del database in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.

Nome tabella metadati (facoltativo)

Dovrebbe essere utilizzato solo durante l'aggiornamento di una pipeline esistente.

Si tratta del nome della tabella di metadati preesistente che deve essere utilizzato dalla di rete. Viene utilizzato dal connettore per archiviare i metadati il consumo dei dati dell'API Change Stream. Se questa opzione è omesso, Spanner crea una nuova tabella con un valore all'inizializzazione del connettore.

Priorità RPC (facoltativo)

La richiesta priorità utilizzata per le query di modifiche in tempo reale. Se questo parametro viene omesso, verrà utilizzato high priority.

InclusiveStartAt (obbligatorio)

Le modifiche apportate al timestamp specificato vengono restituite al chiamante.

InclusiveEndAt (facoltativo)

Le modifiche apportate fino al timestamp specificato vengono restituite al chiamante. Se questo parametro viene omesso, le modifiche verranno applicate a tempo indeterminato.

Aggiungi trasformazioni e sink per elaborare i dati delle modifiche

Una volta completati i passaggi precedenti, il connettore SpannerIO configurato è pronto per emettere una PCollection di oggetti DataChangeRecord. Consulta Esempi di trasformazioni e sink per diversi esempi configurazioni della pipeline che elaborano i flussi di dati in vari modi.

Tieni presente che i record di modifiche in tempo reale emessi dal connettore SpannerIO non sono ordinati. Questo perché PCollection non fornisce alcuna garanzia di ordinazione. Se hai bisogno di un flusso ordinato, devi raggruppare e ordinare i record come trasformazioni nelle pipeline: consulta Esempio: ordine per chiave. Puoi estendere questo esempio per ordinare i record in base a qualsiasi campo dei record, ad esempio in base agli ID transazione.

Esempi di trasformazioni e sink

Puoi definire le tue trasformazioni e specificare i sink in cui scrivere i dati. La documentazione di Apache Beam fornisce una miriade di trasformazioni che possono essere applicate, nonché pronte per l'uso dei connettori I/O per scrivere i dati in sistemi esterni.

Esempio: ordine per chiave

Questo esempio di codice emette record delle modifiche dei dati ordinati per timestamp di commit e raggruppati per chiavi primarie utilizzando il connettore Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Questo esempio di codice utilizza stati e timer per eseguire il buffering dei record per ogni chiave e imposta la scadenza del timer su un valore futuro configurato dall'utente T (definito nella funzione BufferKeyUntilOutputTimestamp). Quando la filigrana Dataflow passa il tempo T, questo codice fa il flush di tutti i record nel buffer con timestamp inferiore a T, ordina questi record in base al timestamp di commit e restituisce una coppia chiave-valore in cui:

  • La chiave è la chiave di input, ovvero la chiave primaria sottoposta ad hashing in un array di bucket di dimensioni 1000.
  • Il valore corrisponde ai record delle modifiche ai dati ordinati memorizzati nel buffer per la chiave.

Per ogni token offriamo le seguenti garanzie:

  • È garantito che i timer si attivino in base al timestamp della scadenza.
  • È garantito che le fasi downstream ricevano gli elementi nello stesso ordine in cui sono stati prodotti.

Ad esempio, supponiamo che per una chiave con valore 100 il timer si attivi rispettivamente a T1 e T10, producendo un bundle di record delle modifiche dei dati a ogni timestamp. Poiché i record delle modifiche dei dati inviati alle T1 sono stati generati prima di quelli generati alle T10, è garantito che anche i record delle modifiche dei dati inviati alle T1 vengano ricevuti nella fase successiva prima che i record delle modifiche dei dati inviati alle T10. Questo meccanismo ci aiuta a garantire un ordine rigoroso del timestamp di commit per chiave primaria per l'elaborazione downstream.

Questo processo si ripeterà fino al termine della pipeline e fino all'elaborazione di tutti i record delle modifiche dei dati (oppure si ripeterà all'infinito se non è specificata un'ora di fine).

Tieni presente che questo esempio di codice utilizza stati e timer, anziché finestre, per eseguire l'ordinamento per chiave. La motivazione è che le finestre non sono garantite da elaborare in ordine. Ciò significa che le finestre meno recenti possono essere elaborate in un secondo momento rispetto a quelle più recenti, il che potrebbe comportare la mancata elaborazione degli ordini.

BreakRecordByModFn

Ogni record di modifiche dei dati può contenere diverse mod. Ogni mod rappresenta un inserimento, un aggiornamento o un'eliminazione in un singolo valore di chiave primaria. Questa funzione suddivide ogni record di modifiche dei dati in record separati delle modifiche dei dati, uno per mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Questa funzione accetta un valore DataChangeRecord e restituisce un valore DataChangeRecord codificato dalla chiave primaria Spanner sottoposta ad hashing in base a un valore intero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

I timer e i buffer sono per chiave. Questa funzione esegue il buffer di ogni record delle modifiche dei dati finché la filigrana non supera il timestamp in cui vogliamo restituire i record delle modifiche dei dati presenti nel buffer.

Questo codice utilizza un timer di loop per determinare quando svuotare il buffer:

  1. Quando rileva per la prima volta un record di modifiche dati per una chiave, imposta il timer in modo che si attivi in corrispondenza del timestamp di commit del record di modifiche dati + incrementIntervalSeconds (opzione configurabile dall'utente).
  2. Quando il timer si attiva, aggiunge a recordsToOutput tutti i record delle modifiche dei dati nel buffer con un timestamp inferiore alla scadenza del timer. Se nel buffer sono presenti record delle modifiche dei dati il cui timestamp è maggiore o uguale alla data e ora di scadenza del timer, questi vengono aggiunti di nuovo nel buffer anziché inviarli. Quindi, imposta il timer successivo sulla data di scadenza del timer attuale più incrementIntervalInSeconds.
  3. Se recordsToOutput non è vuoto, la funzione ordina i record delle modifiche dei dati in recordsToOutput in base al timestamp di commit e all'ID transazione e poi li restituisce.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Transazioni ordini

Questa pipeline può essere modificata in modo da ordinare per ID transazione e timestamp di commit. Per farlo, inserisci i record nel buffer per ogni coppia ID transazione / timestamp di commit, anziché per ogni chiave Spanner. Questa operazione richiede la modifica del codice in KeyByIdFn.

Esempio: combinare transazioni

Questo esempio di codice legge i record delle modifiche dei dati, assembla tutti i record delle modifiche dei dati appartenenti alla stessa transazione in un singolo elemento e genera questo elemento. Tieni presente che le transazioni restituite da questo codice campione non sono ordinate in base al timestamp di commit.

Questo esempio di codice utilizza i buffer per combinare le transazioni dai record delle modifiche dei dati. Quando riceve per la prima volta un record delle modifiche dei dati appartenente a una transazione, legge il campo numberOfRecordsInTransaction nel record delle modifiche dei dati, che descrive il numero previsto di record delle modifiche dei dati appartenenti a quella transazione. Memorizza nel buffer i record delle modifiche dei dati appartenenti a quella transazione finché il numero di record presenti nel buffer non corrisponde a numberOfRecordsInTransaction, quindi genera i record delle modifiche dei dati in bundle.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Questa funzione acquisisce un valore DataChangeRecord e restituisce un valore DataChangeRecord specificato dall'ID transazione.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn buffer hanno ricevuto coppie chiave-valore di {TransactionId, DataChangeRecord} da KeyByTransactionIdFn e li inserisce nel buffer in gruppi basati su TransactionId. Quando il numero di record contenuti nel buffer è uguale al numero di record contenuti nei l'intera transazione, questa funzione ordina gli oggetti DataChangeRecord nel gruppo per sequenza di record e restituisce una coppia chiave-valore {CommitTimestamp, TransactionId} e Iterable<DataChangeRecord>.

In questo caso supponiamo che SortKey sia una classe definita dall'utente che rappresenta una coppia {CommitTimestamp, TransactionId}. Consulta l'implementazione di esempio per SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Esempio: filtrare per tag transazione

Quando una transazione che modifica i dati utente viene codificata, il tag corrispondente e il relativo tipo vengono archiviati come parte di DataChangeRecord. Questi esempi mostrano come filtrare i record di modifiche in tempo reale in base ai tag di transazione definiti dall'utente e ai tag di sistema:

Filtro dei tag definito dall'utente per my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtro dei tag di sistema/controllo TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Esempio: recupera riga intera

Questo esempio funziona con una tabella Spanner denominata Singer che ha la definizione seguente:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Nella modalità di acquisizione del valore predefinita di OLD_AND_NEW_VALUES delle modifiche in tempo reale, in caso di aggiornamento di una riga di Spanner, il record delle modifiche dei dati ricevuto conterrà solo le colonne che sono state modificate. Le colonne monitorate ma non modificate non verranno incluse nel record. La chiave primaria della mod può essere utilizzata per leggere uno snapshot di Spanner al timestamp di commit del record della modifica dei dati per recuperare le colonne non modificate o persino recuperare l'intera riga.

Tieni presente che, affinché lo snapshot venga letto, potrebbe essere necessario modificare il criterio di conservazione del database impostandolo su un valore maggiore o uguale a quello del criterio di conservazione delle modifiche in tempo reale.

Tieni inoltre presente che l'utilizzo del tipo di acquisizione del valore NEW_ROW è il modo consigliato e più efficiente per farlo, poiché restituisce tutte le colonne monitorate della riga per impostazione predefinita e non richiede un'ulteriore lettura di snapshot in Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Questa trasformazione eseguirà una lettura inattiva al timestamp di commit di ogni record ricevuto e mapperà l'intera riga a JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Questo codice crea un client di database Spanner per eseguire il recupero dell'intera riga e configura il pool di sessioni in modo che abbia poche sessioni, eseguendo letture in sequenza in un'istanza di ToFullRowJsonFn. Dataflow garantisce di generare molte istanze di questa funzione, ciascuna con il proprio pool di client.

Esempio: da Spanner a Pub/Sub

In questo scenario, il chiamante trasmette i flussi di record a Pub/Sub come il più rapidamente possibile, senza alcun raggruppamento o aggregazione. Questo è un buon adatti all'attivazione dell'elaborazione downstream, come l'inserimento di flussi di tutte le nuove righe inserito in una tabella Spanner Pub/Sub per ulteriori informazioni e l'elaborazione dei dati.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Tieni presente che il sink Pub/Sub può essere configurato in modo da garantire la semantica exactly-once.

Esempio: da Spanner a Cloud Storage

In questo scenario, il chiamante raggruppa tutti i record all'interno di una determinata finestra e salva il gruppo in file Cloud Storage separati. È una soluzione ideale per l'analisi e l'archiviazione point-in-time, indipendente dal periodo di conservazione di Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Tieni presente che il sink di Cloud Storage fornisce semantica "Almeno una volta" per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificata in modo che abbia una semantica "exactly-once".

Forniamo anche un modello Dataflow per questo caso d'uso: vedi Connettere le modifiche in tempo reale a Cloud Storage.

Esempio: Spanner a BigQuery (tabella di registro)

Qui, i flussi del chiamante modificano i record in BigQuery. Ogni record delle modifiche ai dati viene riportato in una singola riga in BigQuery. È una buona soluzione per l'analisi. Questo codice utilizza le funzioni definite in precedenza, nella sezione Recupera riga completa, per recuperare l'intera riga del record e scriverla in BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Tieni presente che il sink BigQuery fornisce la semantica "Almeno una volta" per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificata in modo che abbia una semantica "exactly-once".

Forniamo anche un modello Dataflow per questo caso d'uso; consulta Connettere le modifiche in tempo reale a BigQuery.

Monitora una pipeline

Sono disponibili due classi di metriche per monitorare una pipeline Dataflow in modalità flusso di modifiche.

Metriche Dataflow standard

Dataflow fornisce diverse metriche per garantire l'integrità del job, come l'aggiornamento dei dati, il ritardo di sistema, la velocità effettiva del job, l'utilizzo della CPU da parte dei worker e altro ancora. Puoi trovare ulteriori informazioni in Utilizzo di Monitoring per le pipeline Dataflow.

Per le pipeline di modifiche in tempo reale, ci sono due metriche principali che devono essere prese in considerazione: la latenza del sistema e l'aggiornamento dei dati.

La latenza del sistema indicherà l'attuale durata massima (in secondi) durante il quale un dato viene elaborato o è in attesa di elaborazione.

L'aggiornamento dei dati mostrerà il periodo di tempo che intercorre tra adesso (in tempo reale) e la filigrana di output. La filigrana di output del tempo T indica che tutti gli elementi con un'ora dell'evento (strettamente) precedente a T sono stati elaborati per il calcolo. In altre parole, l'aggiornamento dei dati misura l'attualità della pipeline per quanto riguarda l'elaborazione degli eventi ricevuti.

Se la pipeline non dispone di risorse sufficienti, puoi vedere questo effetto in queste due metriche. La latenza del sistema aumenterà, perché gli elementi devono attendere più a lungo prima di essere elaborati. Anche l'aggiornamento dei dati aumenterà, perché la pipeline non sarà in grado di stare al passo con la quantità di dati ricevuti.

Metriche personalizzate delle modifiche in tempo reale

Queste metriche sono esposte in Cloud Monitoring e includono:

  • Latenza in bucket (istogramma) tra un record di cui viene eseguito il commit in Spanner e l'emissione in una PCollection da parte del connettore. Questa metrica può essere utilizzata per visualizzare eventuali problemi di prestazioni (latenza) con la pipeline.
  • Numero totale di record di dati letti. Si tratta di un'indicazione generale del numero di record emessi dal connettore. Questo numero dovrebbe essere in costante aumento, rispecchiando la tendenza delle scritture nel database Spanner sottostante.
  • Numero di partizioni attualmente in lettura. Le partizioni devono essere sempre lette. Se questo numero è zero, indica che si è verificato un errore nella pipeline.
  • Numero totale di query eseguite durante l'esecuzione del connettore. Questa è un'indicazione generale delle query di modifiche in tempo reale effettuate all'istanza Spanner durante l'esecuzione della pipeline. Può essere utilizzato per ottenere una stima del carico dal connettore al database Spanner.

Aggiorna una pipeline esistente

È possibile aggiornare una pipeline in esecuzione che utilizza SpannerIO per elaborare modifiche in tempo reale se la compatibilità del job controlli superati. Da fare Devi impostare esplicitamente il parametro del nome della tabella dei metadati il nuovo job quando lo aggiorni. Utilizza il valore dell'attributo metadataTable dal job che stai aggiornando.

Se utilizzi un modello Dataflow fornito da Google, imposta il valore nome della tabella utilizzando il parametro spannerMetadataTableName. Puoi inoltre modificare del job esistente per utilizzare esplicitamente la tabella dei metadati con il metodo withMetadataTable(your-metadata-table-name) pollici la configurazione del connettore. Al termine, puoi seguire le le istruzioni in Avviare il dispositivo sostitutivo job dal documentazione di Dataflow per aggiornare un job in esecuzione.

Best practice per le modifiche in tempo reale e Dataflow

Di seguito sono riportate alcune best practice per la creazione di connessioni alle modifiche in tempo reale con Dataflow.

Usa un database di metadati separato

Consigliamo di creare un database separato da utilizzare per il connettore SpannerIO per l'archiviazione dei metadati, anziché configurarlo per l'uso del database dell'applicazione.

Per ulteriori informazioni, vedi Valuta l'utilizzo di un database di metadati separato.

Dimensione del cluster

Una regola empirica per il numero iniziale di worker in un Il job di modifiche in tempo reale Spanner è di un worker per 1000 scritture secondo. Tieni presente che questa stima può variare a seconda da diversi fattori, come le dimensioni di ogni transazione, il numero di modifiche vengono generati da una singola transazione e da altre trasformazioni, e le aggregazioni o i sink usati nella pipeline.

Dopo il provisioning iniziale, è importante tenere traccia delle metriche menzionate in Monitora una pipeline, per assicurarti che la pipeline sia integro. Consigliamo di sperimentare con un modello le dimensioni del pool di worker e monitorare che gestisce il carico, aumentando il numero di nodi se necessario. La L'utilizzo della CPU è una metrica chiave per verificare se il carico è corretto e se un numero maggiore di nodi sono necessarie.

Limitazioni note

Scalabilità automatica

Supporto della scalabilità automatica per tutte le pipeline che includono SpannerIO.readChangeStream richiede Apache Beam 2.39.0 o in alto.

Se utilizzi una versione Apache Beam precedente a 2.39.0, le pipeline che includono SpannerIO.readChangeStream devono specificare esplicitamente la scalabilità automatica dell'algoritmo come NONE, come descritto Scalabilità automatica orizzontale.

A scalare manualmente una pipeline Dataflow anziché utilizzare la scalabilità automatica, consulta Scalabilità manuale di una pipeline in modalità flusso.

Corridore V2

Il connettore di modifiche in tempo reale di Spanner richiede Dataflow Runner V2. Deve essere specificato manualmente durante l'esecuzione, altrimenti verrà visualizzato un errore lanciate. Puoi specificare Runner V2 configurando il job con --experiments=use_unified_worker,use_runner_v2.

Snapshot

Il connettore di modifiche in tempo reale di Spanner non supporta Snapshot di Dataflow.

Svuotamento in corso

Il connettore di modifiche in tempo reale di Spanner non supporta il svuotamento di un job. È possibile annullare solo un job esistente.

Puoi anche aggiornare una pipeline esistente. senza doverla fermare.

OpenCensus

Per utilizzare OpenCensus per monitorare la pipeline, specifica la versione 0.28.3 o versioni successive.

NullPointerException all'avvio della pipeline

Un bug nella versione 2.38.0 di Apache Beam può causare un errore NullPointerException all'avvio della pipeline in determinate condizioni. Ciò impedirebbe l'avvio del job e visualizzerà invece questo messaggio di errore:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Per risolvere il problema, utilizza Apache Beam versione 2.39.0 o successiva oppure specifica manualmente la versione di beam-sdks-java-core sotto forma di 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Ulteriori informazioni