Trasmettere in streaming le modifiche con Dataflow

Il connettore Bigtable Beam ti consente di utilizzare Dataflow per leggere i record delle modifiche dei dati di Bigtable senza dover monitorare o elaborare le modifiche delle partizioni nel codice, perché il connettore gestisce questa logica per te.

Questo documento descrive come configurare e utilizzare il connettore Bigtable Beam per leggere uno stream di modifiche utilizzando una pipeline Dataflow. Prima di leggere questo documento, devi leggere la Panoramica degli stream di modifiche e acquisire familiarità con Dataflow.

Alternative alla creazione di una pipeline personalizzata

Se non vuoi creare la tua pipeline Dataflow, puoi utilizzare una delle seguenti opzioni.

Puoi utilizzare un modello Dataflow fornito da Google.

Puoi anche utilizzare gli esempi di codice del tutorial o della guida rapida di Bigtable come punto di partenza per il tuo codice.

Assicurati che il codice che generi utilizzi google cloud libraries-bom versione 26.14.0 o successive.

Dettagli connettore

Il metodo del connettore Bigtable Beam, BigtableIO.readChangeStream, consente di leggere un flusso di record di modifica dei dati (ChangeStreamMutation) che puoi elaborare. Il connettore Bigtable Beam è un componente del repository GitHub di Apache Beam. Per una descrizione del codice del connettore, consulta i commenti all'indirizzo BigtableIO.java.

Devi utilizzare il connettore con Beam versione 2.48.0 o successive. Controlla il supporto del runtime di Apache Beam per assicurarti di utilizzare una versione supportata di Java. Poi puoi eseguire il deployment di una pipeline che utilizza il connettore per Dataflow, che gestisce il provisioning e la gestione delle risorse e contribuisce alla scalabilità e all'affidabilità dell'elaborazione dei dati di flusso.

Per saperne di più sul modello di programmazione Apache Beam, consulta la documentazione di Beam.

Raggruppamento dei dati senza orari degli eventi

I record delle modifiche dei dati trasmessi in streaming utilizzando il connettore Bigtable Beam non sono compatibili con le funzioni Dataflow che dipendono dagli orari degli eventi.

Come spiegato in Replica e filigrane, una filigrana bassa potrebbe non avanzare se la replica per la partizione non ha raggiunto il resto dell'istanza. Quando un watermark basso smette di avanzare, può causare l'interruzione dello stream delle modifiche.

Per evitare che lo stream si blocchi, il connettore Bigtable Beam restituisce tutti i dati con un timestamp di output pari a zero. Il timestamp zero fa sì che Dataflow consideri tutti i record di modifica dei dati come dati in ritardo. Di conseguenza, le funzionalità di Dataflow che dipendono dagli orari degli eventi non sono compatibili con modifiche in tempo reale di Bigtable. In particolare, non puoi utilizzare funzioni finestra, trigger basati sull'ora dell'evento o timer basati sull'ora dell'evento.

In alternativa, puoi utilizzare GlobalWindows con trigger temporali non basati su eventi per raggruppare questi dati tardivi in riquadri, come mostrato nell'esempio del tutorial. Per maggiori dettagli su trigger e riquadri, consulta la sezione Trigger nella guida alla programmazione di Beam.

Scalabilità automatica

Il connettore supporta lo scaling automatico di Dataflow, che è attivato per impostazione predefinita quando si utilizza Runner v2 (obbligatorio). L'algoritmo di scalabilità automatica di Dataflow tiene conto del backlog stimato dello stream di modifiche, che può essere monitorato nella pagina Monitoraggio di Dataflow nella sezione Backlog. Utilizza il flag --maxNumWorkers quando esegui il deployment di un job per limitare il numero di worker.

Per scalare manualmente la pipeline anziché utilizzare la scalabilità automatica, consulta Scalare manualmente una pipeline di streaming.

Limitazioni

Prima di utilizzare il connettore Bigtable Beam con Dataflow, tieni presente le seguenti limitazioni.

Dataflow Runner V2

Il connettore può essere eseguito solo utilizzando Dataflow Runner v2. Per attivare questa opzione, specifica --experiments=use_runner_v2 negli argomenti della riga di comando. L'esecuzione con Runner v1 causa l'errore della pipeline con la seguente eccezione:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshot

Il connettore non supporta gli snapshot Dataflow.

Duplicati

Il connettore Bigtable Beam trasmette in streaming le modifiche per ogni chiave di riga e per ogni cluster in ordine di timestamp di commit, ma poiché a volte riavvia da momenti precedenti dello stream, può produrre duplicati.

Riavvii della pipeline

Se una pipeline Dataflow è stata interrotta per un lungo periodo di tempo, i record di modifica dei dati possono rimanere indietro rispetto al limite di conservazione. Quando la pipeline viene ripresa, Bigtable la interrompe in modo da poter avviare una nuova pipeline con un nuovo orario di inizio richiesta che rientri nel periodo di conservazione. Bigtable esegue questa operazione, anziché avanzare silenziosamente l'ora della richiesta della pipeline originale, per evitare l'eliminazione involontaria di record di modifica dei dati con timestamp che rientrano al di fuori del periodo di conservazione specificato.

Prima di iniziare

Prima di utilizzare il connettore, completa i seguenti prerequisiti.

Configura l'autenticazione

Per utilizzare gli esempi di Java questa pagina in un ambiente di sviluppo locale, installa e inizializza gcloud CLI, quindi configura le Credenziali predefinite dell'applicazione con le tue credenziali utente.

    Installa Google Cloud CLI.

    Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Per ulteriori informazioni, vedi Set up authentication for a local development environment.

Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, vedi Set up Application Default Credentials for code running on Google Cloud.

Abilitare un flusso di modifiche

Devi abilitare un flusso di modifiche in una tabella prima di poterla leggere. Puoi anche creare una nuova tabella con modifiche in tempo reale abilitati.

Modifica la tabella dei metadati dello stream

Quando trasmetti in streaming le modifiche con Dataflow, il connettore Bigtable Beam crea una tabella di metadati denominata __change_stream_md_table per impostazione predefinita. La tabella dei metadati del flusso di modifiche gestisce lo stato operativo del connettore e archivia i metadati sui record di modifica dei dati.

Per impostazione predefinita, il connettore crea la tabella nella stessa istanza della tabella di cui viene eseguito lo streaming. Per garantire il corretto funzionamento della tabella, il profilo dell'app per la tabella dei metadati deve utilizzare il routing a cluster singolo e avere le transazioni a riga singola abilitate.

Per ulteriori informazioni sullo streaming delle modifiche da Bigtable con il connettore Bigtable Beam, consulta la documentazione di BigtableIO.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere uno stream di modifiche di Bigtable utilizzando Dataflow, chiedi all'amministratore di concederti i seguenti ruoli IAM.

Per leggere le modifiche da Bigtable, devi disporre di questo ruolo:

  • Amministratore Bigtable (roles/bigtable.admin) nell'istanza Bigtable che contiene la tabella da cui prevedi di trasmettere in streaming le modifiche

Per eseguire il job Dataflow, devi disporre di questi ruoli:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestire l'accesso.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Aggiungi il connettore Bigtable Beam come dipendenza

Aggiungi al file pom.xml di Maven un codice simile alla seguente dipendenza. La versione deve essere 2.48.0 o successiva.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Leggere il flusso di modifiche

Per creare una pipeline Dataflow per leggere i record di modifica dei dati, configura il connettore, quindi aggiungi trasformazioni e sink. Quindi utilizzi il connettore per leggere gli oggetti ChangeStreamMutation in una pipeline Beam.

Gli esempi di codice in questa sezione, scritti in Java, mostrano come creare una pipeline e utilizzarla per convertire coppie chiave-valore in una stringa. Ogni coppia è costituita da una chiave di riga e da un oggetto ChangeStreamMutation. La pipeline converte le voci di ogni oggetto in una stringa separata da virgole.

Crea la pipeline

Questo esempio di codice Java mostra come creare la pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Elabora i record di modifica dei dati

Questo esempio mostra come scorrere tutte le voci di un record di modifica dei dati per una riga e chiamare un metodo di conversione in stringa in base al tipo di voce.

Per un elenco dei tipi di voci che un record di modifica dei dati può contenere, vedi Che cosa contiene un record di modifica dei dati.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In questo esempio viene convertita una voce write:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In questo esempio viene convertita una voce di eliminazione di celle:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In questo esempio, una voce di eliminazione di una famiglia di colonne viene convertita:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Monitoraggio

Le seguenti risorse nella console Google Cloud ti consentono di monitorare le tue risorseGoogle Cloud mentre esegui una pipeline Dataflow per leggere un flusso di modifiche di Bigtable:

In particolare, controlla le seguenti metriche:

  • Nella pagina degli approfondimenti sul sistema Bigtable, controlla le seguenti metriche:
    • Utilizzo della CPU per modifiche in tempo reale nei dati della metrica cpu_load_by_app_profile_by_method_by_table. Mostra l'impatto del flusso di modifiche sull'utilizzo della CPU del cluster.
    • Utilizzo dello spazio di archiviazione per le modifiche in tempo reale (byte) (change_stream_log_used_bytes).
  • Nella pagina di monitoraggio di Dataflow, controlla l'aggiornamento dei dati. Questa metrica mostra la differenza tra l'ora attuale e la filigrana, che è di circa due minuti, con picchi occasionali che durano un minuto o due in più. L'aggiornamento dei dati non indica se i record di modifica dei dati vengono elaborati lentamente. Per garantire l'integrità e le prestazioni continue delle tue applicazioni critiche, monitora la metrica di aggiornamento dei dati di Dataflow ed esegui le seguenti azioni:

    • Se la metrica di aggiornamento dei dati è costantemente superiore alla soglia, la pipeline potrebbe non avere risorse sufficienti. Ti consigliamo di aggiungere altri worker Dataflow.
    • Se i worker Dataflow sono di cui è stato eseguito il provisioning correttamente, ma la freschezza dei dati è aumentata o è costantemente elevata, contatta l'assistenzaGoogle Cloud .
  • La metrica processing_delay_from_commit_timestamp_MEAN di Dataflow può indicare il tempo medio di elaborazione dei record di modifica dei dati durante l'intero ciclo di vita del job.

La metrica server/latencies di Bigtable non è utile quando monitori una pipeline Dataflow che legge un flusso di modifiche Bigtable, perché riflette la durata della richiesta di streaming, non la latenza di elaborazione del record di modifica dei dati. Una latenza elevata in uno stream di modifiche non significa che le richieste vengano elaborate lentamente, ma che la connessione è rimasta aperta per tutto questo tempo.

Passaggi successivi