Crea connessioni di modifiche in tempo reale utilizzando Dataflow

Questa pagina mostra come creare pipeline Dataflow che utilizzano e inoltrano i dati delle modifiche di Spanner utilizzando i flussi di modifiche. Puoi usare il codice di esempio su questa pagina per creare pipeline personalizzate.

Concetti principali

Di seguito sono riportati alcuni concetti fondamentali delle pipeline Dataflow per modifiche in tempo reale.

Dataflow

Dataflow è un servizio serverless, veloce e conveniente che supporta l'elaborazione in modalità flusso e batch. Offre portabilità con job di elaborazione scritti utilizzando le librerie open source Apache Beam e automatizza il provisioning dell'infrastruttura e la gestione dei cluster. Dataflow fornisce flussi di dati quasi in tempo reale durante la lettura dai modifiche in tempo reale.

Puoi utilizzare Dataflow per utilizzare i flussi di modifiche di Spanner con il connettore SpannerIO, che offre un'astrazione dell'API Spanner per eseguire query sui modifiche in tempo reale. Con questo connettore, non devi gestire il ciclo di vita della partizione delle modifiche in tempo reale, che è necessario quando utilizzi direttamente l'API Spanner. Il connettore fornisce un flusso di record delle modifiche ai dati in modo da poterti concentrare maggiormente sulla logica dell'applicazione e meno su dettagli specifici dell'API e sul partizionamento dinamico delle modifiche in tempo reale. Ti consigliamo di utilizzare il connettore SpannerIO anziché l'API Spanner nella maggior parte dei casi in cui devi leggere i dati delle modifiche in tempo reale.

I modelli Dataflow sono pipeline Dataflow predefinite che implementano casi d'uso comuni. Per una panoramica, consulta Modelli Dataflow.

Pipeline Dataflow

Una pipeline Dataflow dei modifiche in tempo reale di Spanner è composta da quattro parti principali:

  1. Un database Spanner con un flusso di modifiche
  2. Il connettore SpannerIO
  3. Trasformazioni e sink definiti dall'utente
  4. Un autore I/O sink

immagine

Ognuna di queste opzioni è discussa in modo più dettagliato di seguito.

Flusso di modifiche di Spanner

Per informazioni dettagliate su come creare una modifica in tempo reale, consulta Creare una modifica in tempo reale.

Connettore SpannerIO Apache Beam

Questo è il connettore SpannerIO descritto in precedenza. È un connettore I/O di origine che emette un PCollection di record di modifiche dei dati per le fasi successive della pipeline. La data e ora dell'evento per ogni record di modifica dei dati emesso corrisponderà al timestamp di commit. Tieni presente che i record emessi sono non ordinati e che il connettore SpannerIO garantisce che non ci saranno record in ritardo.

Quando lavori con modifiche in tempo reale, Dataflow utilizza i checkpoint. Di conseguenza, ogni worker potrebbe attendere fino all'intervallo dei checkpoint configurato per il buffering delle modifiche prima di inviare le modifiche per un'ulteriore elaborazione.

Trasformazioni definite dall'utente

Una trasformazione definita dall'utente consente a un utente di aggregare, trasformare o modificare i dati di elaborazione all'interno di una pipeline Dataflow. I casi d'uso comuni di questo approccio sono la rimozione di informazioni che consentono l'identificazione personale, il rispetto dei requisiti di formato dei dati downstream e l'ordinamento. Consulta la documentazione ufficiale di Apache Beam per consultare la guida alla programmazione sulle transforms.

Writer I/O sink Apache Beam

Apache Beam contiene trasformazioni I/O integrate che possono essere utilizzate per scrivere da una pipeline Dataflow in un sink di dati come BigQuery. I data sink più comuni sono supportati in modo nativo.

Modelli Dataflow

I modelli Dataflow offrono un modo semplice per creare job Dataflow basati su immagini Docker predefinite per casi d'uso comuni tramite la console Google Cloud, l'interfaccia a riga di comando di Google Cloud o le chiamate API Rest.

Per le modifiche in tempo reale di Spanner, forniamo tre modelli flessibili di Dataflow:

Crea una pipeline Dataflow

Questa sezione illustra la configurazione iniziale del connettore e fornisce esempi di integrazioni comuni con la funzionalità dei modifiche in tempo reale di Spanner.

Per seguire questi passaggi, è necessario un ambiente di sviluppo Java per Dataflow. Per ulteriori informazioni, consulta Creare una pipeline Dataflow utilizzando Java.

Crea un flusso di modifiche

Per informazioni dettagliate su come creare una modifica in tempo reale, consulta Creare una modifica in tempo reale. Per continuare con i passaggi successivi, devi avere un database Spanner con un flusso di modifiche configurato.

Concedi privilegi granulari per controllo dell'accesso dell'accesso

Se prevedi che qualsiasi utente controllo dell'accesso granulare esegua il job Dataflow, assicurati che agli utenti venga concesso l'accesso a un ruolo di database con il privilegio SELECT per il flusso di modifiche e il privilegio EXECUTE per la funzione con valori di tabella del flusso di modifiche. Assicurati inoltre che l'entità specifichi il ruolo del database nella configurazione SpannerIO o nel modello flessibile di Dataflow.

Per maggiori dettagli, vedi Informazioni sul controllo dell'accesso granulare.

Aggiungi il connettore SpannerIO come dipendenza

Il connettore SpannerIO di Apache Beam incapsula la complessità del consumo dei modifiche in tempo reale direttamente tramite l'API Cloud Spanner, inviando una PCollection di record di dati dei flussi di modifiche alle fasi successive della pipeline.

Questi oggetti possono essere utilizzati in altre fasi della pipeline Dataflow dell'utente. L'integrazione delle modifiche in tempo reale fa parte del connettore SpannerIO. Per poter utilizzare il connettore SpannerIO, la dipendenza deve essere aggiunta al file pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

crea un database di metadati

Il connettore deve tenere traccia di ogni partizione durante l'esecuzione della pipeline Apache Beam. Conserva questi metadati in una tabella Spanner creata dal connettore durante l'inizializzazione. Devi specificare il database in cui verrà creata questa tabella durante la configurazione del connettore.

Come descritto nella sezione Best practice per le modifiche in tempo reale, ti consigliamo di creare un nuovo database a questo scopo, anziché consentire al connettore di utilizzare il database dell'applicazione per archiviare la tabella dei metadati.

Il proprietario di un job Dataflow che utilizza il connettore SpannerIO deve avere le seguenti autorizzazioni IAM impostate con questo database di metadati:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configura il connettore

Il connettore delle modifiche in tempo reale di Spanner può essere configurato come segue:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Di seguito sono riportate le descrizioni delle opzioni di readChangeStream():

Configurazione Spanner (obbligatoria)

Utilizzato per configurare il progetto, l'istanza e il database da cui è stato creato il flusso di modifiche e da cui è necessario eseguire query. Facoltativamente, specifica anche il ruolo del database da utilizzare se l'entità IAM che esegue il job Dataflow è un utente di controllo dell'accesso granulare. Il job assume questo ruolo di database per l'accesso al flusso di modifiche. Per saperne di più, consulta Informazioni sul controllo dell'accesso granulare.

Nome del flusso di modifiche (obbligatorio)

Questo nome identifica in modo univoco il flusso di modifiche. Il nome qui deve essere lo stesso utilizzato durante la creazione.

ID istanza di metadati (facoltativo)

Questa è l'istanza in cui archiviare i metadati utilizzati dal connettore per controllare l'utilizzo dei dati dell'API Change Stream.

ID database di metadati (obbligatorio)

Questo è il database in cui archiviare i metadati utilizzati dal connettore per controllare l'utilizzo dei dati dell'API Change Stream.

Nome tabella dei metadati (facoltativo)

Deve essere utilizzato solo in caso di aggiornamento di una pipeline esistente.

Questo è il nome della tabella di metadati preesistente che deve essere utilizzato dal connettore. Viene utilizzato dal connettore per archiviare i metadati e controllare il consumo dei dati dell'API Change Stream. Se questa opzione viene omessa, Spanner crea una nuova tabella con un nome generato all'inizializzazione del connettore.

Priorità RPC (facoltativo)

La priorità della richiesta da utilizzare per le query in tempo reale delle modifiche. Se questo parametro viene omesso, verrà utilizzato high priority.

InclusiveStartAt (obbligatorio)

Le modifiche al timestamp specificato vengono restituite al chiamante.

InclusiveEndAt (facoltativo)

Le modifiche apportate fino al timestamp specificato vengono restituite al chiamante. Se questo parametro viene omesso, le modifiche verranno emesse a tempo indeterminato.

Aggiungere trasformazioni e sink per elaborare i dati delle modifiche

Una volta completati i passaggi precedenti, il connettore SpannerIO configurato è pronto per emettere una PCollection di oggetti DataChangeRecord. Consulta Esempio di trasformazioni e sink per varie configurazioni di pipeline di esempio che elaborano i flussi di dati in vari modi.

Tieni presente che i record di modifiche in tempo reale emessi dal connettore SpannerIO non sono ordinati. Questo perché le PCollection non forniscono alcuna garanzia di ordinazione. Se hai bisogno di un flusso ordinato, devi raggruppare e ordinare i record come trasformazioni nelle tue pipeline: consulta Esempio: ordine per chiave. Puoi estendere questo campione per ordinare i record in base a qualsiasi campo dei record, ad esempio per ID transazione.

Esempi di trasformazioni e sink

Puoi definire le tue trasformazioni e specificare i sink in cui scrivere i dati. La documentazione di Apache Beam offre una miriade di transforms che possono essere applicate, oltre ai connettori I/O pronti all'uso per scrivere dati in sistemi esterni.

Esempio: ordina per chiave

Questo esempio di codice invia record delle modifiche ai dati ordinati in base al timestamp di commit e raggruppati per chiavi primarie mediante il connettore Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Questo esempio di codice utilizza stati e timer per eseguire il buffer dei record per ciascuna chiave e imposta la scadenza del timer su un intervallo di tempo configurato dall'utente T nel futuro (definito nella funzione BufferKeyUntilOutputTimestamp). Quando la filigrana di Dataflow supera l'ora T, questo codice svuota tutti i record nel buffer con timestamp inferiore a T, ordina questi record in base al timestamp di commit e restituisce una coppia chiave-valore in cui:

  • La chiave è la chiave di input, ovvero la chiave primaria sottoposta ad hashing in un array di bucket di dimensione 1000.
  • Il valore corrisponde ai record ordinati delle modifiche dei dati inseriti nel buffer per la chiave.

Per ogni chiave, abbiamo le seguenti garanzie:

  • I timer devono essere attivati in ordine di timestamp di scadenza.
  • È garantito che gli elementi ricevano gli elementi nello stesso ordine in cui sono stati prodotti nelle fasi a valle.

Ad esempio, supponiamo che per una chiave con valore 100 il timer si attivi rispettivamente in T1 e T10, generando un bundle di record di modifica dei dati a ogni timestamp. Poiché i record di modifica dei dati generati in T1 sono stati generati prima dei record di modifica dei dati generati in T10, i record di modifica dei dati inviati in T1 devono essere ricevuti entro la fase successiva prima che vengano ricevuti i record di modifica dei dati inviati in T10. Questo meccanismo ci aiuta a garantire un ordine rigoroso del timestamp di commit per chiave primaria per l'elaborazione downstream.

Il processo si ripeterà fino al termine della pipeline e fino a quando tutti i record delle modifiche ai dati non saranno stati elaborati (oppure si ripeterà a tempo indeterminato se non viene specificata un'ora di fine).

Tieni presente che questo esempio di codice utilizza stati e timer, anziché finestre, per eseguire l'ordinamento in base alla chiave. La motivazione è che non è garantito che le finestre vengano elaborate nell'ordine indicato. Ciò significa che le finestre meno recenti possono essere elaborate in un secondo momento rispetto a quelle più recenti, il che potrebbe causare l'elaborazione dell'ordine.

BreakRecordByModFn

Ogni record di modifica dei dati può contenere diverse mod. Ogni mod rappresenta un inserimento, un aggiornamento o un'eliminazione in un singolo valore di chiave primaria. Questa funzione suddivide ogni record delle modifiche dei dati in record separati delle modifiche, uno per modifica.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Questa funzione prende un DataChangeRecord e restituisce un DataChangeRecord con chiave primaria di Spanner con hash e generare un valore intero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

I timer e i buffer variano in base alla chiave. Questa funzione memorizza nel buffer ogni record di modifica dei dati fino a quando la filigrana non supera il timestamp in cui vogliamo generare i record delle modifiche ai dati memorizzati nel buffer.

Questo codice utilizza un timer di loop per determinare quando eliminare il buffer:

  1. Quando rileva per la prima volta un record di modifica dei dati per una chiave, imposta il timer per l'attivazione al timestamp di commit del record di modifica dei dati + incrementIntervalSeconds (un'opzione configurabile dall'utente).
  2. Quando il timer si attiva, aggiunge tutti i record di modifica dei dati nel buffer con un timestamp inferiore alla data di scadenza del timer al valore recordsToOutput. Se il buffer contiene record di modifica dei dati il cui timestamp è maggiore o uguale alla data di scadenza del timer, questi record vengono aggiunti di nuovo nel buffer anziché essere generati. Quindi imposta il timer successivo alla scadenza di quello corrente più incrementIntervalInSeconds.
  3. Se recordsToOutput non è vuoto, la funzione ordina i record di modifica dei dati in recordsToOutput in base al timestamp di commit e all'ID transazione, quindi li restituisce.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Transazioni degli ordini

Questa pipeline può essere cambiata in ordine in base all'ID transazione e al timestamp di commit. A questo scopo, memorizza nel buffer i record per ogni coppia di ID transazione / timestamp di commit, anziché per ogni chiave Spanner. Ciò richiede la modifica del codice in KeyByIdFn.

Esempio: raggruppare le transazioni

Questo esempio di codice legge i record delle modifiche dei dati, assembla tutti i record delle modifiche dei dati appartenenti alla stessa transazione in un singolo elemento e genera questo elemento. Tieni presente che le transazioni generate da questo codice campione non sono ordinate in base al timestamp di commit.

Questo esempio di codice utilizza i buffer per combinare le transazioni dai record di modifica dei dati. Dopo aver ricevuto per la prima volta un record di modifica dei dati appartenente a una transazione, legge il campo numberOfRecordsInTransaction nel record di modifica dei dati, che descrive il numero previsto di record di modifica dei dati appartenenti alla transazione. Memorizza nel buffer i record delle modifiche dei dati appartenenti alla transazione fino a quando il numero di record nel buffer non corrisponde a numberOfRecordsInTransaction, quindi genera i record delle modifiche dei dati raggruppati.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Questa funzione prende un DataChangeRecord e restituisce un DataChangeRecord codificato dall'ID transazione.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn memorizza nel buffer coppie chiave-valore di {TransactionId, DataChangeRecord} ricevute da KeyByTransactionIdFn e le memorizza nel buffer in gruppi basati su TransactionId. Quando il numero di record nel buffer è uguale al numero di record contenuti nell'intera transazione, questa funzione ordina gli oggetti DataChangeRecord nel gruppo per sequenza di record e restituisce una coppia chiave-valore {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

In questo caso, supponiamo che SortKey sia una classe definita dall'utente che rappresenta una coppia {CommitTimestamp, TransactionId}. Consulta l'implementazione di esempio per SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Esempio: filtrare per tag transazione

Quando viene applicato un tag a una transazione che modifica dati utente, il tag corrispondente e il tipo vengono archiviati come parte di DataChangeRecord. Questi esempi mostrano come filtrare i record di modifiche in tempo reale in base ai tag transazione definiti dall'utente e ai tag di sistema:

Filtro dei tag definito dall'utente per my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Controllo TTL/Filtro dei tag di sistema:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Esempio: recupera riga intera

Questo esempio funziona con una tabella Spanner denominata Singer che ha la definizione seguente:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

In una modalità di acquisizione dei valori OLD_AND_NEW_VALUES predefinita delle modifiche in tempo reale, in caso di aggiornamento di una riga di Spanner, il record delle modifiche dei dati ricevuto conterrà solo le colonne che sono state modificate. Le colonne monitorate ma non modificate non verranno incluse nel record. La chiave primaria della mod può essere utilizzata per leggere lo snapshot di Spanner al timestamp di commit del record di modifica dei dati per recuperare le colonne invariate o persino recuperare la riga intera.

Tieni presente che il criterio di conservazione del database potrebbe dover essere modificato in un valore maggiore o uguale al criterio di conservazione delle modifiche in tempo reale affinché la lettura dello snapshot abbia esito positivo.

Tieni inoltre presente che l'utilizzo del tipo di acquisizione del valore NEW_ROW è il modo consigliato e più efficiente per farlo, poiché restituisce tutte le colonne monitorate della riga per impostazione predefinita e non richiede la lettura di snapshot aggiuntiva in Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Questa trasformazione eseguirà una lettura inattiva al timestamp di commit di ogni record ricevuto e mappa l'intera riga a JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Questo codice crea un client di database Spanner per eseguire il recupero dell'intera riga e configura il pool di sessioni in modo che abbia solo alcune sessioni, eseguendo le letture in un'istanza di ToFullRowJsonFn in sequenza. Dataflow si assicura di generare molte istanze di questa funzione, ciascuna con il proprio pool client.

Esempio: da Spanner a Pub/Sub

In questo scenario, il chiamante trasmette i flussi di dati a Pub/Sub il più rapidamente possibile, senza alcun raggruppamento o aggregazione. È una soluzione ideale per attivare l'elaborazione downstream, come il trasferimento di flussi di tutte le nuove righe inserite in una tabella Spanner a Pub/Sub per l'ulteriore elaborazione.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Tieni presente che il sink Pub/Sub può essere configurato per assicurare la semantica exactly-once.

Esempio: da Spanner a Cloud Storage

In questo scenario, il chiamante raggruppa tutti i record all'interno di una determinata finestra e salva il gruppo in file Cloud Storage separati. È una soluzione adeguata per l'analisi e l'archiviazione point-in-time, che è indipendente dal periodo di conservazione di Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Tieni presente che il sink di Cloud Storage fornisce la semantica "at-least-once" per impostazione predefinita. Con ulteriore elaborazione, può essere modificato in modo che abbia una semantica "exactly-once".

Forniamo anche un modello Dataflow per questo caso d'uso: consulta Collegare modifiche in tempo reale a Cloud Storage.

Esempio: da Spanner a BigQuery (tabella contabile)

In questo caso, il chiamante trasmette i flussi di record delle modifiche in BigQuery. Ogni record di modifica dei dati viene riportato come una riga in BigQuery. È adatto per l'analisi. Questo codice utilizza le funzioni definite in precedenza, nella sezione Recupera riga completa, per recuperare la riga completa del record e scriverla in BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Tieni presente che il sink BigQuery fornisce la semantica "at-least-once" per impostazione predefinita. Con ulteriore elaborazione, può essere modificato in modo che abbia una semantica "exactly-once".

Forniamo anche un modello Dataflow per questo caso d'uso; consulta Collegare modifiche in tempo reale a BigQuery.

Monitora una pipeline

Sono disponibili due classi di metriche per monitorare una pipeline Dataflow di modifiche in tempo reale.

Metriche Dataflow standard

Dataflow fornisce diverse metriche per garantire l'integrità del job, ad esempio aggiornamento dei dati, ritardo di sistema, velocità effettiva del job, utilizzo della CPU dei worker e altro ancora. Puoi trovare ulteriori informazioni in Utilizzo di Monitoring per le pipeline di Dataflow.

Per le pipeline dei modifiche in tempo reale, ci sono due metriche principali che devono essere prese in considerazione: la latenza di sistema e l'aggiornamento dei dati.

La latenza del sistema indicherà la durata massima attuale (in secondi) per cui un elemento di dati è stato elaborato o in attesa di elaborazione.

L'aggiornamento dei dati mostra la quantità di tempo che intercorre tra adesso (in tempo reale) e la filigrana di output. La filigrana di output con data T indica che tutti gli elementi con un'ora dell'evento (rigorosamente) prima di T sono stati elaborati per il calcolo. In altre parole, l'aggiornamento dei dati misura il livello di attualità della pipeline per quanto riguarda l'elaborazione degli eventi ricevuti.

Se le risorse della pipeline sono insufficienti, puoi riscontrare questo risultato in queste due metriche. La latenza del sistema aumenterà, perché gli elementi devono attendere più a lungo prima di essere elaborati. Anche l'aggiornamento dei dati aumenterà, perché la pipeline non sarà in grado di tenere il passo con la quantità di dati ricevuti.

Metriche personalizzate relative alle modifiche in tempo reale

Queste metriche sono esposte in Cloud Monitoring e includono:

  • Latenza in bucket (istogramma) tra il commit di un record in Spanner e l'emissione in una PCollection da parte del connettore. Questa metrica può essere utilizzata per individuare eventuali problemi di prestazioni (latenza) della pipeline.
  • Numero totale di record di dati letti. Si tratta di un'indicazione complessiva del numero di record emessi dal connettore. Questo numero dovrebbe essere in costante aumento, rispecchiando la tendenza delle scritture nel database Spanner sottostante.
  • Numero di partizioni attualmente in lettura. Devono essere sempre presenti partizioni da leggere. Se questo numero è zero, significa che si è verificato un errore nella pipeline.
  • Numero totale di query eseguite durante l'esecuzione del connettore. Si tratta di un'indicazione complessiva delle query di modifiche in tempo reale apportate all'istanza Spanner durante l'esecuzione della pipeline. Può essere utilizzato per ottenere una stima del carico dal connettore al database Spanner.

Aggiorna una pipeline esistente

È possibile aggiornare una pipeline in esecuzione che utilizza il connettore SpannerIO per elaborare i modifiche in tempo reale se i controlli di compatibilità dei job vengono superati. A tale scopo, devi impostare esplicitamente il parametro del nome della tabella dei metadati del nuovo job durante l'aggiornamento. Utilizza il valore dell'opzione della pipeline metadataTable del job che stai aggiornando.

Se utilizzi un modello Dataflow fornito da Google, imposta il nome della tabella utilizzando il parametro spannerMetadataTableName. Puoi anche modificare il job esistente per utilizzare in modo esplicito la tabella dei metadati con il metodo withMetadataTable(your-metadata-table-name) nella configurazione del connettore. Al termine, puoi seguire le istruzioni riportate in Avviare il job di sostituzione dalla documentazione di Dataflow per aggiornare un job in esecuzione.

Best practice per le modifiche in tempo reale e Dataflow

Di seguito sono riportate alcune best practice per la creazione di connessioni ai modifiche in tempo reale mediante Dataflow.

Utilizza un database di metadati separato

Ti consigliamo di creare un database separato per il connettore SpannerIO da utilizzare per l'archiviazione dei metadati, anziché configurarlo per l'utilizzo del database dell'applicazione.

Per scoprire di più, consulta la sezione Valutare un database di metadati separato.

Dimensione il cluster

Una regola generale per un numero iniziale di worker in un job di modifiche in tempo reale di Spanner è un worker ogni 1000 scritture al secondo. Tieni presente che questa stima può variare in base a diversi fattori, come le dimensioni di ogni transazione, il numero di record di modifiche in tempo reale prodotti da una singola transazione e altre trasformazioni, aggregazioni o sink utilizzati nella pipeline.

Dopo le risorse iniziali, è importante monitorare le metriche indicate in Monitorare una pipeline per garantirne l'integrità. Ti consigliamo di sperimentare le dimensioni iniziali del pool di worker e di monitorare in che modo la pipeline gestisce il carico, aumentando il numero di nodi, se necessario. L'utilizzo della CPU è una metrica chiave per verificare se il carico è corretto e se sono necessari più nodi.

Limitazioni note

Scalabilità automatica

Il supporto della scalabilità automatica per qualsiasi pipeline che include SpannerIO.readChangeStream richiede Apache Beam 2.39.0 o versioni successive.

Se utilizzi una versione di Apache Beam precedente a 2.39.0, le pipeline che includono SpannerIO.readChangeStream devono specificare esplicitamente l'algoritmo di scalabilità automatica come NONE, come descritto in Scalabilità automatica orizzontale.

Per scalare manualmente una pipeline Dataflow anziché utilizzare la scalabilità automatica, consulta Scalabilità manuale di una pipeline in modalità flusso.

Corridore V2

Il connettore dei modifiche in tempo reale di Spanner richiede Dataflow Runner V2. Deve essere specificato manualmente durante l'esecuzione, altrimenti verrà generato un errore. Puoi specificare Runner V2 configurando il job con --experiments=use_unified_worker,use_runner_v2.

Snapshot

Il connettore delle modifiche in tempo reale di Spanner non supporta gli snapshot di Dataflow.

Svuotamento in corso

Il connettore dei modifiche in tempo reale di Spanner non supporta il download di un job. È possibile annullare solo un job esistente.

Puoi anche aggiornare una pipeline esistente senza doverla arrestare.

OpenCensus

Per utilizzare OpenCensus per monitorare la pipeline, specifica la versione 0.28.3 o successive.

NullPointerException all'avvio della pipeline

Un bug nella versione 2.38.0 di Apache Beam può causare un NullPointerException all'avvio della pipeline in determinate condizioni. In questo modo il job non verrà avviato e verrà visualizzato questo messaggio di errore:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Per risolvere questo problema, utilizza Apache Beam versione 2.39.0 o successive oppure specifica manualmente la versione di beam-sdks-java-core come 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Ulteriori informazioni