Creare connessioni di modifiche in tempo reale utilizzando Dataflow

Questa pagina mostra come creare pipeline Dataflow che consumare e inoltrare i dati di modifica di Spanner utilizzando flussi di modifiche. Puoi utilizzare il codice di esempio in questa pagina per creare pipeline personalizzate.

Concetti principali

Di seguito sono riportati alcuni concetti fondamentali per le pipeline di Dataflow per gli stream di variazioni.

Dataflow

Dataflow è un servizio serverless, veloce e conveniente che supporta sia l'elaborazione in modalità flusso che in batch. Offre portabilità con i job di elaborazione scritti utilizzando le librerie open source di Apache Beam e automatizza il provisioning dell'infrastruttura e la gestione dei cluster. Dataflow fornisce flussi di dati quasi in tempo reale durante la lettura dai modifiche in tempo reale.

Puoi usare Dataflow per applicare la modifica di Spanner i flussi di dati con SpannerIO , che offre un'astrazione sull'API Spanner per eseguire query sulle modifiche in tempo reale. Con questo connettore, non dovrai gestire le modifiche in tempo reale del ciclo di vita della partizione, necessaria quando utilizzi direttamente l'API Spanner. Il connettore fornisce un flusso di record di modifica dei dati in modo da poterti concentrare maggiormente sulla logica dell'applicazione e meno su dettagli specifici dell'API e sulla partizione dinamica dei flussi di modifica. Consigliamo di utilizzare il connettore SpannerIO anziché dell'API Spanner nella maggior parte dei casi in cui è necessario i dati delle modifiche in tempo reale.

I modelli Dataflow sono pipeline Dataflow predefinite che implementano casi d'uso comuni. Consulta Modelli Dataflow per una panoramica.

Pipeline Dataflow

Una pipeline Dataflow di stream di modifiche Spanner è composta da quattro parti principali:

  1. Un database Spanner con un flusso di modifiche
  2. Il connettore SpannerIO
  3. Trasformazioni e sink definiti dall'utente
  4. Uno scrittore I/O di sink

immagine

Di seguito vengono descritti in maggiore dettaglio ciascuno di questi aspetti.

Modifiche in tempo reale Spanner

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche.

Connettore Apache Beam SpannerIO

Si tratta del connettore SpannerIO descritto in precedenza. È una connettore di I/O di origine che emette un PCollection di record delle modifiche dei dati nelle fasi successive della pipeline. La ora evento per ogni record delle modifiche dei dati emesse sarà il timestamp di commit. Tieni presente che i record emessi sono non ordinati e che il connettore SpannerIO garantisce che non ci saranno record in ritardo.

Quando si lavora con i flussi di modifiche, Dataflow utilizza i checkpoint. Di conseguenza, ogni worker potrebbe attendere fino all'intervallo di controllo configurato per il buffering delle modifiche prima di inviarle per l'ulteriore elaborazione.

Trasformazioni definite dall'utente

Una trasformazione definita dall'utente consente a un utente di aggregare, trasformare o modificare i dati di elaborazione all'interno di una pipeline Dataflow. Alcuni casi d'uso comuni sono la rimozione di informazioni che consentono l'identificazione personale, la soddisfazione dei requisiti relativi al formato dei dati a valle e l'ordinamento. Consulta le documentazione ufficiale di Apache Beam per la guida alla programmazione su trasforma.

Writer I/O sink Apache Beam

Apache Beam contiene trasformazioni I/O integrate che possono utilizzata per scrivere da una pipeline Dataflow in un data sink come BigQuery. I data sink più comuni sono supportati in modo nativo.

Modelli Dataflow

I modelli Dataflow offrono un modo semplice per creare job Dataflow basati su immagini Docker predefinite per casi d'uso comuni tramite la console Google Cloud, l'interfaccia a riga di comando Google Cloud o le chiamate API REST.

Per le modifiche in tempo reale di Spanner, forniamo tre modelli flessibili Dataflow:

Imposta autorizzazioni IAM per i modelli Dataflow

Prima di creare un job Dataflow con i tre modelli flessibili elencati, assicurati di disporre delle autorizzazioni IAM richieste per i seguenti account di servizio:

Se non hai le autorizzazioni IAM richieste, devi Specificare un account di servizio per il worker gestito dall'utente per creare il job Dataflow. Per ulteriori informazioni, vedi Sicurezza e autorizzazioni di Dataflow.

Quando provi a eseguire un job da un modello flessibile di Dataflow senza tutte le autorizzazioni richieste, il job potrebbe non riuscire con un errore di lettura del file del risultato o un errore di autorizzazione negata per la risorsa. Per ulteriori informazioni, vedi Risolvi i problemi relativi ai modelli flessibili.

Creare una pipeline Dataflow

Questa sezione illustra la configurazione iniziale del connettore e fornisce esempi per integrazioni comuni con le modifiche in tempo reale di Spanner funzionalità.

Per seguire questi passaggi, devi disporre di un ambiente di sviluppo Java per Dataflow. Per ulteriori informazioni, vedi Crea una pipeline Dataflow utilizzando Java.

Crea un flusso di modifiche

Per maggiori dettagli su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, devi avere un database Spanner con un flusso di modifiche configurato.

Concedere privilegi di controllo dell'accesso granulari

Se prevedi che il job Dataflow verrà eseguito da utenti di controllo dell'accesso granulare, assicura che agli utenti sia concesso l'accesso a un database ruolo con il privilegio SELECT per le modifiche in tempo reale e EXECUTE per la funzione con valori di tabella delle modifiche in tempo reale. Assicurati inoltre che specifica il ruolo del database nella configurazione SpannerIO o il modello flessibile di Dataflow.

Per ulteriori informazioni, consulta Informazioni sul controllo dell'accesso granulare.

Aggiungi il connettore SpannerIO come dipendenza

Il connettore Apache Beam SpannerIO incapsula la complessità di utilizzare i flussi di modifiche direttamente tramite l'API Cloud Spanner, emettendo una PCollection di record di dati del flusso di modifiche nelle fasi successive della pipeline.

Questi oggetti possono essere utilizzati in altre fasi della pipeline di Dataflow dell'utente. L'integrazione delle modifiche in tempo reale fa parte del connettore SpannerIO. Per poter utilizzare il connettore SpannerIO, devi aggiungere la dipendenza al file pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

crea un database di metadati

Il connettore deve tenere traccia di ogni partizione quando esegue nella pipeline Apache Beam. Conserva questi metadati in uno Spanner creata dal connettore durante l'inizializzazione. Specifica il database in cui verrà creata questa tabella durante la configurazione del connettore.

Come descritto nella sezione Best practice per gli stream di modifiche, consigliamo di creare un nuovo database per questo scopo, anziché consentire per utilizzare il database dell'applicazione al fine di archiviare la tabella dei metadati.

Il proprietario di un job Dataflow che utilizza il connettore SpannerIO deve avere impostato le seguenti autorizzazioni IAM con questo database di metadati:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configura il connettore

Il connettore per gli stream di modifiche di Spanner può essere configurato come segue:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Di seguito sono riportate le descrizioni delle opzioni di readChangeStream():

Configurazione Spanner (obbligatoria)

Utilizzato per configurare il progetto, l'istanza e il database in cui è stato creato il flusso di modifiche e da cui deve essere eseguita la query. Se vuoi, specifica anche il ruolo database da utilizzare quando l'entità IAM che esegue il job Dataflow è un utente di controllo dell'accesso granulare. Il job assume questo ruolo di database per l'accesso al flusso di modifiche. Per Per ulteriori informazioni, consulta Informazioni sul controllo dell'accesso granulare.

Nome del flusso di modifiche (obbligatorio)

Questo nome identifica in modo univoco il flusso di modifiche. Il nome qui deve essere lo stesso utilizzato durante la creazione dell'elemento.

(Facoltativo) ID istanza dei metadati

Questa è l'istanza in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.

ID database dei metadati (obbligatorio)

Si tratta del database in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.

Nome tabella metadati (facoltativo)

Dovrebbe essere utilizzato solo durante l'aggiornamento di una pipeline esistente.

Si tratta del nome della tabella di metadati preesistente che deve essere utilizzato dalla di rete. Viene utilizzato dal connettore per memorizzare i metadati al fine di controllare il consumo dei dati dell'API flusso di modifiche. Se questa opzione viene omessa, Spanner crea una nuova tabella con un nome generato all'inizializzazione del connettore.

Priorità RPC (facoltativa)

La priorità della richiesta da utilizzare per le query sugli stream di variazioni. Se questo parametro viene omesso, verrà utilizzato high priority.

InclusiveStartAt (obbligatorio)

Le modifiche apportate al timestamp specificato vengono restituite al chiamante.

InclusiveEndAt (facoltativo)

Le modifiche fino al timestamp specificato vengono restituite al chiamante. Se questo parametro viene omesso, le modifiche verranno applicate a tempo indeterminato.

Aggiungi trasformazioni e sink per elaborare i dati delle modifiche

Una volta completati i passaggi precedenti, il connettore SpannerIO configurato è pronto per emettere una PCollection di oggetti DataChangeRecord. Consulta Esempi di trasformazioni e destinazioni per diverse configurazioni di pipeline di esempio che elaborano questi dati in streaming in vari modi.

Tieni presente che i record dello stream delle modifiche emessi dal connettore SpannerIO non sono ordinati. Questo perché le PCollection non forniscono alcuna garanzia di ordinamento. Se hai bisogno di uno stream ordinato, devi raggruppare e ordinare i record come trasformazioni nelle pipeline: consulta Esempio: ordinamento per chiave. Puoi estendere questo esempio per ordinare i record in base a qualsiasi campo, ad esempio in base agli ID transazione.

Esempi di trasformazioni e sink

Puoi definire le tue trasformazioni e specificare i sink in cui scrivere i dati. La documentazione di Apache Beam fornisce una miriade di trasformazioni che possono essere applicate, nonché pronte per l'uso dei connettori I/O per scrivere i dati in sistemi esterni.

Esempio: ordine per chiave

Questo esempio di codice emette record delle modifiche dei dati ordinati per timestamp di commit e raggruppati per chiavi primarie utilizzando il connettore Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Questo esempio di codice utilizza stati e timer per mettere in buffer i record per ogni chiave e imposta la data e l'ora di scadenza del timer su un'ora configurata dall'utente T in futuro (definita nella funzione BufferKeyUntilOutputTimestamp). Quando la marcatura temporale di Dataflow supera il tempo T, questo codice svuota tutti i record nel buffer con timestamp inferiore a T, li ordina in base al timestamp del commit e restituisce una coppia chiave-valore in cui:

  • La chiave è la chiave di input, ovvero la chiave principale sottoposta ad hashing in un array di bucket di dimensioni 1000.
  • Il valore corrisponde ai record delle modifiche ai dati ordinati memorizzati nel buffer per la chiave.

Per ogni token offriamo le seguenti garanzie:

  • L'attivazione dei timer è garantita in base al timestamp di scadenza.
  • È garantito che le fasi downstream ricevano gli elementi nello stesso ordine in cui sono stati prodotti.

Ad esempio, supponiamo che per una chiave con valore 100 il timer si attivi rispettivamente a T1 e T10, producendo un bundle di record delle modifiche dei dati a ogni timestamp. Poiché i record delle modifiche dei dati inviati alle T1 sono stati generati prima di quelli generati alle T10, è garantito che anche i record delle modifiche dei dati inviati alle T1 vengano ricevuti nella fase successiva prima che i record delle modifiche dei dati inviati alle T10. Questo meccanismo ci consente di garantire un'organizzazione rigorosa dei timestamp dei commit in base alla chiave primaria per l'elaborazione a valle.

Questo processo verrà ripetuto fino al termine della pipeline e all'elaborazione di tutti i record di modifica dei dati (oppure verrà ripetuto all'infinito se non viene specificata un'ora di fine).

Tieni presente che questo esempio di codice utilizza stati e timer anziché finestre per eseguire l'ordinamento per chiave. La motivazione è che le finestre non sono garantite da elaborare in ordine. Ciò significa che le finestre meno recenti possono essere elaborate in un secondo momento rispetto a quelle più recenti, il che potrebbe comportare la mancata elaborazione degli ordini.

BreakRecordByModFn

Ogni record delle modifiche dei dati può contenere diverse mod. Ogni mod rappresenta un inserimento, un aggiornamento o un'eliminazione di un singolo valore della chiave primaria. Questa funzione suddivide ogni record di modifica dei dati in record di modifica dei dati separati, uno per mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Questa funzione accetta un valore DataChangeRecord e restituisce un valore DataChangeRecord codificato dalla chiave primaria Spanner con hashing in base a un valore intero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

I timer e i buffer sono per chiave. Questa funzione esegue il buffer di ogni record delle modifiche dei dati finché la filigrana non supera il timestamp in cui vogliamo restituire i record delle modifiche dei dati presenti nel buffer.

Questo codice utilizza un timer di loop per determinare quando svuotare il buffer:

  1. Quando rileva per la prima volta un record di modifiche dati per una chiave, imposta il timer in modo che si attivi in corrispondenza del timestamp di commit del record di modifiche dati + incrementIntervalSeconds (opzione configurabile dall'utente).
  2. Quando il timer si attiva, aggiunge a recordsToOutput tutti i record di modifica dei dati nel buffer con timestamp inferiore alla data e ora di scadenza del timer. Se nel buffer sono presenti record delle modifiche dei dati il cui timestamp è maggiore o uguale alla data di scadenza del timer, questi vengono aggiunti di nuovo nel buffer anziché inviarli. Imposta quindi il timer successivo sull'ora di scadenza del timer corrente più incrementIntervalInSeconds.
  3. Se recordsToOutput non è vuoto, la funzione ordina i record delle modifiche dei dati in recordsToOutput in base al timestamp di commit e all'ID transazione e poi li restituisce.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Ordinamento delle transazioni

Questa pipeline può essere modificata in modo da ordinare i dati in base all'ID transazione e al timestamp del commit. A tale scopo, memorizza in un buffer i record per ogni coppia di ID transazione/timestamp commit anziché per ogni chiave Spanner. Questa operazione richiede la modifica del codice in KeyByIdFn.

Esempio: Assemblare transazioni

Questo esempio di codice legge i record delle modifiche dei dati, assembla tutti i record delle modifiche dei dati appartenenti alla stessa transazione in un singolo elemento e genera questo elemento. Tieni presente che le transazioni restituite da questo codice campione non sono ordinate in base al timestamp di commit.

Questo esempio di codice utilizza i buffer per combinare le transazioni dai record delle modifiche dei dati. Quando riceve per la prima volta un record di variazione dei dati appartenente a una transazione, legge il campo numberOfRecordsInTransaction nel record di variazione dei dati, che descrive il numero previsto di record di variazione dei dati appartenenti a quella transazione. Memorizza in un buffer i record di variazione dei dati appartenenti a quella transazione finché il numero di record in buffer non corrisponde a numberOfRecordsInTransaction, dopodiché emette i record di variazione dei dati raggruppati.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Questa funzione riceve un DataChangeRecord e restituisce un DataChangeRecord basato sull'ID transazione.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn memorizza le coppie chiave-valore ricevute di {TransactionId, DataChangeRecord} da KeyByTransactionIdFn e le memorizza in gruppi in base a TransactionId. Quando il numero di record contenuti nel buffer è uguale al numero di record contenuti nei l'intera transazione, questa funzione ordina gli oggetti DataChangeRecord nel gruppo per sequenza di record e restituisce una coppia chiave-valore {CommitTimestamp, TransactionId} e Iterable<DataChangeRecord>.

Qui assumiamo che SortKey sia una classe definita dall'utente che rappresenta una coppia {CommitTimestamp, TransactionId}. Consulta l'implementazione di esempio per SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Esempio: filtrare per tag transazione

Quando una transazione che modifica i dati utente viene taggata, il tag corrispondente e il relativo tipo vengono memorizzati all'interno di DataChangeRecord. Questi esempi mostrano come filtrare i record di modifiche in tempo reale in base ai tag di transazione definiti dall'utente e ai tag di sistema:

Filtro dei tag definiti dall'utente per my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Controllo del filtro dei tag di sistema/TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Esempio: recupera riga intera

Questo esempio funziona con una tabella Spanner denominata Singer con la seguente definizione:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Nella modalità di acquisizione del valore predefinita di OLD_AND_NEW_VALUES delle modifiche in tempo reale, in caso di aggiornamento di una riga di Spanner, il record delle modifiche dei dati ricevuto conterrà solo le colonne che sono state modificate. Le colonne monitorate, ma invariate, non verranno incluse nel record. La chiave primaria della modifica può essere utilizzata per eseguire una lettura dello snapshot di Spanner al timestamp del commit del record di modifica dei dati per recuperare le colonne invariate o persino la riga completa.

Tieni presente che il criterio di conservazione del database potrebbe dover essere modificato in un valore maggiore o uguale al criterio di conservazione del flusso di modifiche affinché la lettura dello snapshot vada a buon fine.

Tieni inoltre presente che l'utilizzo del tipo di acquisizione del valore NEW_ROW è il modo consigliato ed efficiente per farlo, poiché restituisce per impostazione predefinita tutte le colonne monitorate della riga e non richiede la lettura di uno snapshot aggiuntivo in Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Questa trasformazione eseguirà una lettura inattiva al timestamp di commit di ogni record ricevuto e mapperà l'intera riga a JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Questo codice crea un client di database Spanner per eseguire il recupero dell'intera riga e configura il pool di sessioni in modo che abbia poche sessioni, eseguendo letture in sequenza in un'istanza di ToFullRowJsonFn. Dataflow garantisce di generare molte istanze di questa funzione, ciascuna con il proprio pool di client.

Esempio: da Spanner a Pub/Sub

In questo scenario, il chiamante trasmette i flussi di record a Pub/Sub come il più rapidamente possibile, senza alcun raggruppamento o aggregazione. Questo è un buon adatti all'attivazione dell'elaborazione downstream, come l'inserimento di flussi di tutte le nuove righe inserito in una tabella Spanner Pub/Sub per ulteriori informazioni e l'elaborazione dei dati.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Tieni presente che il sink Pub/Sub può essere configurato in modo da garantire la semantica exactly-once.

Esempio: Spanner to Cloud Storage

In questo scenario, il chiamante raggruppa tutti i record all'interno di una determinata finestra e salva il gruppo in file Cloud Storage separati. È una soluzione ideale per l'analisi e l'archiviazione point-in-time, indipendente dal periodo di conservazione di Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Tieni presente che l'emissario Cloud Storage fornisce la semantica almeno una volta per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificato in modo da avere una semantica exactly-once.

Forniamo anche un modello Dataflow per questo caso d'uso: consulta Collegare gli stream di modifiche a Cloud Storage.

Esempio: Spanner a BigQuery (tabella di registro)

Qui, i flussi del chiamante modificano i record in BigQuery. Ogni record delle modifiche ai dati viene riportato in una singola riga in BigQuery. Questa è una buona soluzione per l'analisi. Questo codice utilizza le funzioni definite in precedenza, nella sezione Recupera riga completa, per recuperare l'intera riga del record e scriverla in BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Tieni presente che il sink BigQuery fornisce la semantica "Almeno una volta" per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificato in modo da avere una semantica exactly-once.

Forniamo anche un modello Dataflow per questo caso d'uso; consulta Connettere le modifiche in tempo reale a BigQuery.

Monitorare una pipeline

Esistono due classi di metriche disponibili per monitorare una pipeline Dataflow di stream di modifiche.

Metriche Dataflow standard

Dataflow fornisce diverse metriche per garantire l'integrità del job, come l'aggiornamento dei dati, il ritardo di sistema, la velocità effettiva del job, l'utilizzo della CPU da parte dei worker e altro ancora. Puoi trovare ulteriori informazioni in Utilizzo di Monitoring per le pipeline Dataflow.

Per le pipeline di modifiche in tempo reale, ci sono due metriche principali che devono essere prese in considerazione: la latenza del sistema e l'aggiornamento dei dati.

La latenza del sistema indicherà l'attuale durata massima (in secondi) durante il quale un dato viene elaborato o è in attesa di elaborazione.

L'aggiornamento dei dati mostrerà il periodo di tempo che intercorre tra adesso (in tempo reale) e la filigrana di output. La filigrana di output del tempo T indica che tutti gli elementi con un'ora dell'evento (strettamente) precedente a T sono stati elaborati per il calcolo. In altre parole, l'aggiornamento dei dati misura l'aggiornamento della pipeline in merito all'elaborazione degli eventi ricevuti.

Se la pipeline non dispone di risorse sufficienti, puoi vedere questo effetto in queste due metriche. La latenza del sistema aumenterà, perché gli elementi devono attendere più a lungo prima di essere elaborati. Anche l'aggiornamento dei dati aumenterà, perché la pipeline non sarà in grado di stare al passo con la quantità di dati ricevuti.

Metriche personalizzate delle modifiche in tempo reale

Queste metriche sono esposte in Cloud Monitoring e includono:

  • Latenza in bucket (istogramma) tra un record di cui viene eseguito il commit in Spanner e l'emissione in una PCollection da parte del connettore. Questa metrica può essere utilizzata per visualizzare eventuali problemi di prestazioni (latenza) della pipeline.
  • Numero totale di record di dati letti. Si tratta di un'indicazione generale del numero di record emessi dal connettore. Questo numero dovrebbe aumentare costantemente, rispecchiando la tendenza delle scritture nel database Spanner sottostante.
  • Numero di partizioni attualmente in lettura. Dovrebbero sempre essere lette partizioni. Se questo numero è pari a zero, indica che si è verificato un errore nella pipeline.
  • Numero totale di query emesse durante l'esecuzione del connettore. Questa è un'indicazione generale delle query di modifiche in tempo reale effettuate all'istanza Spanner durante l'esecuzione della pipeline. Questo può essere utilizzato per ottenere una stima del carico dal connettore al database Spanner.

Aggiornare una pipeline esistente

È possibile aggiornare una pipeline in esecuzione che utilizza SpannerIO per elaborare modifiche in tempo reale se la compatibilità del job controlli superati. Da fare Devi impostare esplicitamente il parametro del nome della tabella dei metadati il nuovo job quando lo aggiorni. Utilizza il valore dell'attributo metadataTable dal job che stai aggiornando.

Se utilizzi un modello Dataflow fornito da Google, imposta il nome della tabella utilizzando il parametro spannerMetadataTableName. Puoi anche modificare il job esistente in modo da utilizzare esplicitamente la tabella dei metadati con il metodo withMetadataTable(your-metadata-table-name) nella configurazione del connettore. Al termine, puoi seguire le le istruzioni in Avviare il dispositivo sostitutivo job dal documentazione di Dataflow per aggiornare un job in esecuzione.

Best practice per gli stream di modifiche e Dataflow

Di seguito sono riportate alcune best practice per la creazione di connessioni alle modifiche in tempo reale con Dataflow.

Usa un database di metadati separato

Consigliamo di creare un database separato da utilizzare per lo spazio di archiviazione dei metadati dal connettore SpannerIO, anziché configurarlo per utilizzare il database dell'applicazione.

Per ulteriori informazioni, vedi Valuta l'utilizzo di un database di metadati separato.

Dimensiona il cluster

Una regola empirica per il numero iniziale di worker in un Il job di modifiche in tempo reale Spanner è di un worker per 1000 scritture secondo. Tieni presente che questa stima può variare a seconda da diversi fattori, come le dimensioni di ogni transazione, il numero di modifiche vengono generati da una singola transazione e da altre trasformazioni, e le aggregazioni o i sink usati nella pipeline.

Dopo il provisioning iniziale, è importante tenere traccia delle metriche menzionate in Monitora una pipeline, per assicurarti che la pipeline sia integro. Consigliamo di sperimentare con un modello le dimensioni del pool di worker e monitorare che gestisce il carico, aumentando il numero di nodi se necessario. L'utilizzo della CPU è una metrica chiave per verificare se il carico è corretto e se sono necessari più nodi.

Limitazioni note

Scalabilità automatica

Il supporto della scalabilità automatica per le pipeline che includono SpannerIO.readChangeStream richiede Apache Beam 2.39.0 o versioni successive.

Se utilizzi una versione Apache Beam precedente a 2.39.0, le pipeline che includono SpannerIO.readChangeStream devono specificare esplicitamente la scalabilità automatica dell'algoritmo come NONE, come descritto Scalabilità automatica orizzontale.

Per eseguire la scalabilità manuale di una pipeline Dataflow anziché utilizzare la scalabilità automatica, consulta Eseguire la scalabilità manuale di una pipeline in streaming.

Corridore V2

Il connettore per i flussi di modifiche di Spanner richiede Dataflow Runner v2. Questo valore deve essere specificato manualmente durante l'esecuzione, altrimenti verrà visualizzato un errore lanciate. Puoi specificare Runner V2 configurando il job con --experiments=use_unified_worker,use_runner_v2.

Snapshot

Il connettore di modifiche in tempo reale di Spanner non supporta Snapshot di Dataflow.

Svuotamento in corso

Il connettore di modifiche in tempo reale di Spanner non supporta il svuotamento di un job. È possibile annullare solo un job esistente.

Puoi anche aggiornare una pipeline esistente. senza doverla fermare.

OpenCensus

Per utilizzare OpenCensus per monitorare la pipeline, specifica la versione 0.28.3 o versioni successive.

NullPointerException all'avvio della pipeline

Un bug nella versione 2.38.0 di Apache Beam può causare un NullPointerException all'avvio della pipeline in determinate condizioni. Ciò impedirà l'avvio del job e verrà visualizzato il seguente messaggio di errore:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Per risolvere il problema, utilizza Apache Beam versione 2.39.0 o successiva oppure specifica manualmente la versione di beam-sdks-java-core sotto forma di 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Ulteriori informazioni