Creare connessioni di modifiche in tempo reale utilizzando Dataflow

Questa pagina mostra come creare pipeline Dataflow che utilizzano e inoltrano dati di modifica di Spanner utilizzando flussi di modifiche. Puoi usare il codice di esempio in questa pagina per creare pipeline personalizzate.

Concetti principali

Di seguito sono riportati alcuni concetti fondamentali delle pipeline Dataflow per modifiche in tempo reale.

Dataflow

Dataflow è un servizio serverless, veloce e conveniente che supporta l'elaborazione in modalità flusso e batch. Offre la portabilità con job di elaborazione scritti utilizzando le librerie open source Apache Beam e automatizza il provisioning dell'infrastruttura e la gestione dei cluster. Dataflow fornisce flussi di dati quasi in tempo reale durante la lettura dai modifiche in tempo reale.

Puoi utilizzare Dataflow per utilizzare i flussi di modifiche di Spanner con il connettore SpannerIO, che offre un'astrazione rispetto all'API Spanner per eseguire query sui modifiche in tempo reale. Con questo connettore, non devi gestire il ciclo di vita della partizione modifiche in tempo reale, che è necessario quando utilizzi direttamente l'API Spanner. Il connettore fornisce un flusso di record delle modifiche dei dati in modo che tu possa concentrarti maggiormente sulla logica dell'applicazione e meno sui dettagli specifici dell'API e sul partizionamento dei flussi di modifiche dinamiche. Consigliamo di utilizzare il connettore SpannerIO anziché l'API Spanner nella maggior parte dei casi in cui è necessario leggere i dati delle modifiche in tempo reale.

I modelli Dataflow sono pipeline Dataflow predefinite che implementano casi d'uso comuni. Per una panoramica, consulta Modelli Dataflow.

Pipeline Dataflow

Una pipeline Dataflow in modifiche in tempo reale di Spanner è composta da quattro parti:

  1. Un database Spanner con un flusso di modifiche
  2. Il connettore SpannerIO
  3. Trasformazioni e sink definiti dall'utente
  4. Un writer I/O sink

immagine

Ciascuna di queste opzioni viene discussa più in dettaglio di seguito.

Modifiche in tempo reale Spanner

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche.

Connettore SpannerIO Apache Beam

Si tratta del connettore SpannerIO descritto in precedenza. È un connettore di I/O di origine che invia un PCollection di record delle modifiche dei dati alle fasi successive della pipeline. L'ora dell'evento per ogni record di modifiche dei dati emesse sarà il timestamp di commit. Tieni presente che i record emessi sono non ordinati e che il connettore SpannerIO garantisce che non ci saranno record in ritardo.

Quando si lavora con le modifiche in tempo reale, Dataflow utilizza i checkpoint. Di conseguenza, ogni worker potrebbe attendere fino all'intervallo di checkpoint configurato per il buffering delle modifiche prima di inviare le modifiche per l'ulteriore elaborazione.

Trasformazioni definite dall'utente

Una trasformazione definita dall'utente consente a un utente di aggregare, trasformare o modificare i dati di elaborazione all'interno di una pipeline Dataflow. I casi d'uso comuni sono la rimozione di informazioni che consentono l'identificazione personale, il rispetto dei requisiti di formato dei dati a valle e l'ordinamento. Consulta la documentazione ufficiale di Apache Beam per la guida alla programmazione sulle transforms.

Writer I/O sink Apache Beam

Apache Beam contiene trasformazioni I/O integrate che possono essere utilizzate per scrivere da una pipeline Dataflow in un data sink come BigQuery. I data sink più comuni sono supportati in modo nativo.

Modelli Dataflow

I modelli Dataflow offrono un modo semplice per creare job Dataflow basati su immagini Docker predefinite per casi d'uso comuni tramite la console Google Cloud, l'interfaccia a riga di comando di Google Cloud o le chiamate API Rest.

Per le modifiche in tempo reale di Spanner, forniamo tre modelli flessibili di Dataflow:

Crea una pipeline Dataflow

Questa sezione illustra la configurazione iniziale del connettore e fornisce esempi per le integrazioni comuni con la funzionalità di modifiche in tempo reale di Spanner.

Per seguire questi passaggi, è necessario un ambiente di sviluppo Java per Dataflow. Per ulteriori informazioni, consulta Creare una pipeline Dataflow utilizzando Java.

Crea un flusso di modifiche

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, devi avere un database Spanner con un flusso di modifiche configurato.

Concedere privilegi di controllo dell'accesso granulari

Se prevedi che il job Dataflow verrà eseguito da utenti di controllo dell'accesso granulare, assicurati che agli utenti sia concesso l'accesso a un ruolo di database con il privilegio SELECT sul flusso di modifiche e il privilegio EXECUTE sulla funzione con valore di tabella del flusso di modifiche. Assicurati inoltre che l'entità specifichi il ruolo del database nella configurazione SpannerIO o nel modello flessibile di Dataflow.

Per ulteriori informazioni, consulta Informazioni sul controllo dell'accesso granulare.

Aggiungi il connettore SpannerIO come dipendenza

Il connettore SpannerIO di Apache Beam include la complessità dell'utilizzo dei flussi di modifiche direttamente tramite l'API Cloud Spanner, inviando una PCollection di record di dati relativi alle modifiche in tempo reale nelle fasi successive della pipeline.

Questi oggetti possono essere utilizzati in altre fasi della pipeline Dataflow dell'utente. L'integrazione delle modifiche in tempo reale fa parte del connettore SpannerIO. Per poter utilizzare il connettore SpannerIO, devi aggiungere la dipendenza al file pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

crea un database di metadati

Il connettore deve tenere traccia di ogni partizione quando viene eseguita la pipeline Apache Beam. Conserva questi metadati in una tabella Spanner creata dal connettore durante l'inizializzazione. Devi specificare il database in cui verrà creata la tabella durante la configurazione del connettore.

Come descritto nelle Best practice per i flussi di modifiche, consigliamo di creare un nuovo database per questo scopo, anziché consentire al connettore di utilizzare il database dell'applicazione per archiviare la tabella dei metadati.

Il proprietario di un job Dataflow che utilizza il connettore SpannerIO deve avere le seguenti autorizzazioni IAM impostate con questo database di metadati:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configura il connettore

Il connettore di modifiche in tempo reale di Spanner può essere configurato come segue:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Di seguito sono riportate le descrizioni delle opzioni di readChangeStream():

Configurazione Spanner (obbligatoria)

Utilizzato per configurare il progetto, l'istanza e il database in cui è stato creato il flusso di modifiche e da cui deve essere eseguita la query. Inoltre, facoltativamente, specifica anche il ruolo del database da utilizzare quando l'entità IAM che esegue il job Dataflow è un utente con controllo dell'accesso dell'accesso granulare. Il job assume questo ruolo di database per l'accesso al flusso di modifiche. Per maggiori informazioni, consulta Informazioni sul controllo dell'accesso granulare.

Nome flusso di modifiche (obbligatorio)

Questo nome identifica in modo univoco il flusso di modifiche. Il nome qui deve essere lo stesso utilizzato durante la creazione dell'elemento.

ID istanza metadati (facoltativo)

Questa è l'istanza in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.

ID database dei metadati (obbligatorio)

Si tratta del database in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.

Nome tabella metadati (facoltativo)

Dovrebbe essere utilizzato solo durante l'aggiornamento di una pipeline esistente.

Si tratta del nome della tabella di metadati preesistente che deve essere utilizzato dal connettore. Viene utilizzato dal connettore per archiviare i metadati al fine di controllare il consumo dei dati dell'API Change Stream. Se questa opzione viene omessa, Spanner crea una nuova tabella con un nome generato all'inizializzazione del connettore.

Priorità RPC (facoltativo)

La priorità della richiesta da utilizzare per le query delle modifiche in tempo reale. Se questo parametro viene omesso, verrà utilizzato high priority.

InclusiveStartAt (obbligatorio)

Le modifiche apportate al timestamp specificato vengono restituite al chiamante.

InclusiveEndAt (facoltativo)

Le modifiche apportate fino al timestamp specificato vengono restituite al chiamante. Se questo parametro viene omesso, le modifiche verranno applicate a tempo indeterminato.

Aggiungi trasformazioni e sink per elaborare i dati delle modifiche

Una volta completati i passaggi precedenti, il connettore SpannerIO configurato è pronto per emettere una PCollection di oggetti DataChangeRecord. Consulta Esempi di trasformazioni e sink per diverse configurazioni di pipeline di esempio che elaborano i flussi di dati in vari modi.

Tieni presente che i record di modifiche in tempo reale emessi dal connettore SpannerIO non sono ordinati. Questo perché PCollection non fornisce alcuna garanzia di ordinazione. Se hai bisogno di un flusso ordinato, devi raggruppare e ordinare i record come trasformazioni nelle pipeline: consulta Esempio: ordine per chiave. Puoi estendere questo esempio per ordinare i record in base a qualsiasi campo dei record, ad esempio in base agli ID transazione.

Esempi di trasformazioni e sink

Puoi definire le tue trasformazioni e specificare i sink in cui scrivere i dati. La documentazione di Apache Beam fornisce una miriade di transforms che possono essere applicate, nonché pronte per l'uso dei connettori I/O per scrivere i dati in sistemi esterni.

Esempio: ordine per chiave

Questo esempio di codice emette record delle modifiche dei dati ordinati per timestamp di commit e raggruppati per chiavi primarie utilizzando il connettore Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Questo esempio di codice utilizza stati e timer per eseguire il buffering dei record per ogni chiave e imposta la scadenza del timer su un valore futuro configurato dall'utente T (definito nella funzione BufferKeyUntilOutputTimestamp). Quando la filigrana Dataflow passa il tempo T, questo codice fa il flush di tutti i record nel buffer con timestamp inferiore a T, ordina questi record in base al timestamp di commit e restituisce una coppia chiave-valore in cui:

  • La chiave è la chiave di input, ovvero la chiave primaria sottoposta ad hashing in un array di bucket di dimensioni 1000.
  • Il valore corrisponde ai record delle modifiche ai dati ordinati memorizzati nel buffer per la chiave.

Per ogni token offriamo le seguenti garanzie:

  • È garantito che i timer si attivino in base al timestamp della scadenza.
  • È garantito che le fasi downstream ricevano gli elementi nello stesso ordine in cui sono stati prodotti.

Ad esempio, supponiamo che per una chiave con valore 100 il timer si attivi rispettivamente a T1 e T10, producendo un bundle di record delle modifiche dei dati a ogni timestamp. Poiché i record delle modifiche dei dati inviati alle T1 sono stati generati prima di quelli generati alle T10, è garantito che anche i record delle modifiche dei dati inviati alle T1 vengano ricevuti nella fase successiva prima che i record delle modifiche dei dati inviati alle T10. Questo meccanismo ci aiuta a garantire un ordine rigoroso del timestamp di commit per chiave primaria per l'elaborazione downstream.

Questo processo si ripeterà fino al termine della pipeline e fino all'elaborazione di tutti i record delle modifiche dei dati (oppure si ripeterà all'infinito se non è specificata un'ora di fine).

Tieni presente che questo esempio di codice utilizza stati e timer, anziché finestre, per eseguire l'ordinamento per chiave. La motivazione è che non è garantito che le finestre vengano elaborate in ordine. Ciò significa che le finestre meno recenti possono essere elaborate in un secondo momento rispetto a quelle più recenti, il che potrebbe comportare la mancata elaborazione degli ordini.

BreakRecordByModFn

Ogni record delle modifiche dei dati può contenere diverse mod. Ogni mod rappresenta un inserimento, un aggiornamento o un'eliminazione in un singolo valore di chiave primaria. Questa funzione suddivide ogni record di modifiche dei dati in record separati delle modifiche dei dati, uno per mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Questa funzione accetta un valore DataChangeRecord e restituisce un valore DataChangeRecord codificato dalla chiave primaria Spanner sottoposta ad hashing in base a un valore intero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

I timer e i buffer sono per chiave. Questa funzione esegue il buffer di ogni record delle modifiche dei dati finché la filigrana non supera il timestamp in cui vogliamo restituire i record delle modifiche dei dati presenti nel buffer.

Questo codice utilizza un timer di loop per determinare quando svuotare il buffer:

  1. Quando rileva per la prima volta un record di modifiche dati per una chiave, imposta il timer in modo che si attivi in corrispondenza del timestamp di commit del record di modifiche dati + incrementIntervalSeconds (opzione configurabile dall'utente).
  2. Quando il timer si attiva, aggiunge a recordsToOutput tutti i record delle modifiche dei dati nel buffer con un timestamp inferiore alla scadenza del timer. Se nel buffer sono presenti record delle modifiche dei dati il cui timestamp è maggiore o uguale alla data e ora di scadenza del timer, questi vengono aggiunti di nuovo nel buffer anziché inviarli. Quindi, imposta il timer successivo sulla data di scadenza del timer attuale più incrementIntervalInSeconds.
  3. Se recordsToOutput non è vuoto, la funzione ordina i record delle modifiche dei dati in recordsToOutput in base al timestamp di commit e all'ID transazione e poi li restituisce.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Transazioni ordini

Questa pipeline può essere modificata in modo da ordinare per ID transazione e timestamp di commit. Per farlo, inserisci i record nel buffer per ogni coppia ID transazione / timestamp di commit, anziché per ogni chiave Spanner. Questa operazione richiede la modifica del codice in KeyByIdFn.

Esempio: combinare transazioni

Questo esempio di codice legge i record delle modifiche dei dati, assembla tutti i record delle modifiche dei dati appartenenti alla stessa transazione in un singolo elemento e genera questo elemento. Tieni presente che le transazioni restituite da questo codice campione non sono ordinate in base al timestamp di commit.

Questo esempio di codice utilizza i buffer per assemblare le transazioni dai record delle modifiche dei dati. Quando riceve per la prima volta un record delle modifiche dei dati appartenente a una transazione, legge il campo numberOfRecordsInTransaction nel record delle modifiche dei dati, che descrive il numero previsto di record delle modifiche dei dati appartenenti a quella transazione. Memorizza nel buffer i record delle modifiche dei dati appartenenti a quella transazione finché il numero di record presenti nel buffer non corrisponde a numberOfRecordsInTransaction, quindi genera i record delle modifiche dei dati in bundle.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Questa funzione acquisisce un valore DataChangeRecord e restituisce un valore DataChangeRecord specificato dall'ID transazione.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn buffer hanno ricevuto coppie chiave-valore di {TransactionId, DataChangeRecord} da KeyByTransactionIdFn e le memorizza in gruppi in base a TransactionId. Quando il numero di record nel buffer è uguale al numero di record contenuti nell'intera transazione, questa funzione ordina gli oggetti DataChangeRecord del gruppo per sequenza di record e restituisce una coppia chiave-valore di {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

In questo caso supponiamo che SortKey sia una classe definita dall'utente che rappresenta una coppia {CommitTimestamp, TransactionId}. Consulta l'implementazione di esempio per SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Esempio: filtrare per tag transazione

Quando una transazione che modifica i dati utente viene codificata, il tag corrispondente e il relativo tipo vengono archiviati come parte di DataChangeRecord. Questi esempi mostrano come filtrare i record di modifiche in tempo reale in base ai tag di transazione definiti dall'utente e ai tag di sistema:

Filtro dei tag definito dall'utente per my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtro dei tag di sistema/controllo TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Esempio: recupera riga intera

Questo esempio funziona con una tabella Spanner denominata Singer che ha la definizione seguente:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Nella modalità di acquisizione del valore predefinita di OLD_AND_NEW_VALUES delle modifiche in tempo reale, in caso di aggiornamento di una riga di Spanner, il record delle modifiche dei dati ricevuto conterrà solo le colonne che sono state modificate. Le colonne monitorate ma non modificate non verranno incluse nel record. La chiave primaria della mod può essere utilizzata per leggere uno snapshot di Spanner al timestamp di commit del record della modifica dei dati per recuperare le colonne non modificate o persino recuperare l'intera riga.

Tieni presente che, per completare la lettura dello snapshot, potrebbe essere necessario modificare il criterio di conservazione del database impostandolo su un valore maggiore o uguale a quello del criterio di conservazione delle modifiche in tempo reale.

Tieni inoltre presente che l'utilizzo del tipo di acquisizione del valore NEW_ROW è il modo consigliato e più efficiente per farlo, poiché restituisce tutte le colonne monitorate della riga per impostazione predefinita e non richiede un'ulteriore lettura di snapshot in Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Questa trasformazione eseguirà una lettura inattiva al timestamp di commit di ogni record ricevuto e mapperà l'intera riga a JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Questo codice crea un client di database Spanner per eseguire il recupero dell'intera riga e configura il pool di sessioni in modo che abbia poche sessioni, eseguendo letture in sequenza in un'istanza di ToFullRowJsonFn. Dataflow garantisce di generare molte istanze di questa funzione, ciascuna con il proprio pool di client.

Esempio: da Spanner a Pub/Sub

In questo scenario, il chiamante trasmette i flussi di record a Pub/Sub il più rapidamente possibile, senza alcun raggruppamento o aggregazione. Ciò è ideale per attivare l'elaborazione downstream, come l'inserimento di flussi di tutte le nuove righe inserite in una tabella Spanner in Pub/Sub per un'ulteriore elaborazione.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Tieni presente che il sink Pub/Sub può essere configurato in modo da garantire la semantica exactly-once.

Esempio: da Spanner a Cloud Storage

In questo scenario, il chiamante raggruppa tutti i record all'interno di una determinata finestra e salva il gruppo in file Cloud Storage separati. È una soluzione ideale per l'analisi e l'archiviazione point-in-time, indipendente dal periodo di conservazione di Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Tieni presente che il sink di Cloud Storage fornisce semantica "Almeno una volta" per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificata in modo che abbia una semantica "exactly-once".

Forniamo anche un modello Dataflow per questo caso d'uso: vedi Connettere le modifiche in tempo reale a Cloud Storage.

Esempio: Spanner a BigQuery (tabella di registro)

Qui, i flussi del chiamante modificano i record in BigQuery. Ogni record delle modifiche ai dati viene riportato in una singola riga in BigQuery. È una buona soluzione per l'analisi. Questo codice utilizza le funzioni definite in precedenza, nella sezione Recupera riga completa, per recuperare l'intera riga del record e scriverla in BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Tieni presente che il sink BigQuery fornisce la semantica "Almeno una volta" per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificata in modo che abbia una semantica "exactly-once".

Forniamo anche un modello Dataflow per questo caso d'uso; vedi Connettere le modifiche in tempo reale a BigQuery.

Monitora una pipeline

Sono disponibili due classi di metriche per monitorare una pipeline Dataflow in modalità flusso di modifiche.

Metriche Dataflow standard

Dataflow fornisce diverse metriche per garantire l'integrità del job, come l'aggiornamento dei dati, il ritardo di sistema, la velocità effettiva del job, l'utilizzo della CPU da parte dei worker e altro ancora. Puoi trovare ulteriori informazioni in Utilizzo di Monitoring per le pipeline Dataflow.

Per le pipeline di modifiche in tempo reale, ci sono due metriche principali che devono essere prese in considerazione: la latenza del sistema e l'aggiornamento dei dati.

La latenza del sistema indicherà l'attuale durata massima (in secondi) durante il quale un dato viene elaborato o è in attesa di elaborazione.

L'aggiornamento dei dati mostrerà il periodo di tempo che intercorre tra adesso (in tempo reale) e la filigrana di output. La filigrana di output del tempo T indica che tutti gli elementi con un'ora dell'evento (strettamente) precedente a T sono stati elaborati per il calcolo. In altre parole, l'aggiornamento dei dati misura l'attualità della pipeline per quanto riguarda l'elaborazione degli eventi ricevuti.

Se la pipeline non dispone di risorse sufficienti, puoi vedere questo effetto in queste due metriche. La latenza del sistema aumenterà, perché gli elementi devono attendere più a lungo prima di essere elaborati. Anche l'aggiornamento dei dati aumenterà, perché la pipeline non sarà in grado di stare al passo con la quantità di dati ricevuti.

Metriche personalizzate delle modifiche in tempo reale

Queste metriche sono esposte in Cloud Monitoring e includono:

  • Latenza in bucket (istogramma) tra un record di cui viene eseguito il commit in Spanner e l'emissione in una PCollection da parte del connettore. Questa metrica può essere utilizzata per visualizzare eventuali problemi di prestazioni (latenza) con la pipeline.
  • Numero totale di record di dati letti. Si tratta di un'indicazione generale del numero di record emessi dal connettore. Questo numero dovrebbe essere in costante aumento, rispecchiando la tendenza delle scritture nel database Spanner sottostante.
  • Numero di partizioni attualmente in lettura. Le partizioni devono essere sempre lette. Se questo numero è zero, indica che si è verificato un errore nella pipeline.
  • Numero totale di query eseguite durante l'esecuzione del connettore. Questa è un'indicazione generale delle query di modifiche in tempo reale effettuate all'istanza Spanner durante l'esecuzione della pipeline. Può essere utilizzato per ottenere una stima del carico dal connettore al database Spanner.

Aggiorna una pipeline esistente

È possibile aggiornare una pipeline in esecuzione che utilizza il connettore SpannerIO per elaborare le modifiche in tempo reale se i controlli di compatibilità dei job vengono superati. Per farlo, devi impostare esplicitamente il parametro del nome della tabella dei metadati del nuovo job al momento dell'aggiornamento. Utilizza il valore dell'opzione pipeline metadataTable del job che stai aggiornando.

Se utilizzi un modello Dataflow fornito da Google, imposta il nome della tabella utilizzando il parametro spannerMetadataTableName. Puoi anche modificare il job esistente per utilizzare esplicitamente la tabella dei metadati con il metodo withMetadataTable(your-metadata-table-name) nella configurazione del connettore. Al termine, puoi seguire le istruzioni in Avvio del job di sostituzione dalla documentazione di Dataflow per aggiornare un job in esecuzione.

Best practice per le modifiche in tempo reale e Dataflow

Di seguito sono riportate alcune best practice per la creazione di connessioni di modifiche in tempo reale tramite Dataflow.

Usa un database di metadati separato

Consigliamo di creare un database separato per il connettore SpannerIO da utilizzare per l'archiviazione dei metadati, anziché configurarlo in modo che utilizzi il database dell'applicazione.

Per ulteriori informazioni, consulta Valutare l'uso di un database di metadati separato.

Dimensione del cluster

Una regola generale per il numero iniziale di worker in un job di modifiche in tempo reale di Spanner è un worker per 1000 scritture al secondo. Tieni presente che questa stima può variare in base a diversi fattori, tra cui le dimensioni di ogni transazione, il numero di record di modifiche in tempo reale prodotti da una singola transazione e altre trasformazioni, aggregazioni o sink utilizzati nella pipeline.

Dopo aver eseguito il provisioning iniziale, è importante tenere traccia delle metriche menzionate in Monitorare una pipeline per garantirne l'integrità. Consigliamo di fare esperimenti con la dimensione iniziale del pool di worker e di monitorare in che modo la pipeline gestisce il carico, aumentando il numero di nodi se necessario. L'utilizzo della CPU è una metrica chiave per verificare se il carico è corretto e se sono necessari più nodi.

Limitazioni note

Scalabilità automatica

Il supporto della scalabilità automatica per qualsiasi pipeline che include SpannerIO.readChangeStream richiede Apache Beam 2.39.0 o versioni successive.

Se utilizzi una versione Apache Beam precedente alla 2.39.0, le pipeline che includono SpannerIO.readChangeStream devono specificare esplicitamente l'algoritmo di scalabilità automatica come NONE, come descritto in Scalabilità automatica orizzontale.

Per scalare manualmente una pipeline Dataflow anziché utilizzare la scalabilità automatica, consulta Scalare manualmente una pipeline in modalità flusso.

Corridore V2

Il connettore di modifiche in tempo reale di Spanner richiede Dataflow Runner V2. Deve essere specificato manualmente durante l'esecuzione, altrimenti verrà generato un errore. Puoi specificare Runner V2 configurando il job con --experiments=use_unified_worker,use_runner_v2.

Snapshot

Il connettore di modifiche in tempo reale di Spanner non supporta gli snapshot di Dataflow.

Svuotamento in corso

Il connettore di modifiche in tempo reale di Spanner non supporta il svuotamento di un job. È possibile annullare solo un job esistente.

Puoi anche aggiornare una pipeline esistente senza doverla arrestare.

OpenCensus

Per utilizzare OpenCensus per monitorare la pipeline, specifica la versione 0.28.3 o successiva.

NullPointerException all'avvio della pipeline

Un bug nella versione 2.38.0 di Apache Beam può causare un errore NullPointerException all'avvio della pipeline in determinate condizioni. Ciò impedirebbe l'avvio del job e visualizzerà invece questo messaggio di errore:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Per risolvere il problema, utilizza Apache Beam versione 2.39.0 o successiva oppure specifica manualmente la versione di beam-sdks-java-core come 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Ulteriori informazioni