Trasmetti le modifiche con Dataflow

Il connettore Bigtable Beam ti consente di utilizzare Dataflow leggere i record delle modifiche dei dati di Bigtable senza dover tenere traccia o partizione dei processi modifiche perché è il connettore a gestire questa logica per te.

Questo documento descrive come configurare e utilizzare il connettore Bigtable Beam per leggere un flusso di modifiche utilizzando una pipeline Dataflow. Prima di leggere questo documento, ti consigliamo di leggere la Panoramica delle modifiche di streaming e di acquisire familiarità con Dataflow.

Alternative alla creazione di una pipeline personalizzata

Se non vuoi creare il tuo Dataflow pipeline, puoi utilizzare una delle seguenti opzioni.

Puoi utilizzare un modello Dataflow fornito da Google.

Puoi anche utilizzare gli esempi di codice del tutorial su Bigtable come punto di partenza per il tuo codice.

Assicurati che il codice generato utilizzi google cloud libraries-bom 26.14.0 o versioni successive.

Dettagli connettore

Il metodo del connettore Bigtable Beam, BigtableIO.readChangeStream, consente di leggere un flusso di dati record di modifiche (ChangeStreamMutation) che puoi elaborare. Il connettore Bigtable Beam è del componente Apache Beam GitHub un repository attendibile. Per una descrizione del codice del connettore, consulta i commenti all'indirizzo BigtableIO.java.

È necessario utilizzare il connettore con Beam versione 2.48.0 o successiva. Controlla Apache Beam runtime per assicurare che stai utilizzando una versione di Java supportata. Poi puoi eseguire il deployment di una pipeline utilizza il connettore per Dataflow, che gestisce il provisioning e la gestione. di risorse e aiuta la scalabilità e l'affidabilità dei flussi di dati e l'elaborazione dei dati.

Per ulteriori informazioni sul modello di programmazione Apache Beam, consulta Documentazione relativa a Beam.

Raggruppamento dei dati senza data e ora evento

I record delle modifiche dei dati trasmessi in streaming utilizzando il connettore Bigtable Beam non sono compatibili con Dataflow che dipendono dagli orari degli eventi.

Come spiegato in Replica e filigrane, un la filigrana in esaurimento potrebbe non avanzare se la replica della partizione non è stata rilevata al resto dell'istanza. Quando una filigrana bassa smette di avanzare, può il flusso di modifiche si blocca.

Per evitare che il flusso si blocchi, il connettore Bigtable Beam produce tutti i dati con un timestamp di output zero. Il timestamp zero fa sì che Dataflow consideri tutti i dati modifica i record in modo che dati in ritardo. Di conseguenza, le funzionalità di Dataflow che dipendono dagli orari degli eventi compatibile con le modifiche in tempo reale di Bigtable. Nello specifico, non puoi utilizzare funzioni di windowing, Trigger all'ora dell'evento, o timer all'ora dell'evento.

Puoi utilizzare invece GlobalWindows con trigger basati sull'ora non dell'evento per raggruppare questi dati tardivi in riquadri, come dimostrato nell'esempio del tutorial. Per maggiori dettagli su trigger e riquadri, consulta Trigger nella guida alla programmazione Beam.

Scalabilità automatica

Il connettore supporta Scalabilità automatica Dataflow, che è abilitato per impostazione predefinita quando Runner v2 (obbligatorio). L'algoritmo di scalabilità automatica di Dataflow tiene conto il backlog delle modifiche in tempo reale, che può essere monitorato Monitoraggio di Dataflow nella sezione Backlog. Usa il flag --maxNumWorkers durante il deployment di per limitare il numero di worker.

Per scalare manualmente la pipeline anziché utilizzare la scalabilità automatica, consulta Scalabilità manuale di una pipeline in modalità flusso.

Limitazioni

Nota le seguenti limitazioni prima di utilizzare il connettore Bigtable Beam con Dataflow.

Esecutore Dataflow V2

Il connettore può essere eseguito solo utilizzando Dataflow Runner v2. Per abilitare questa funzionalità, specifica --experiments=use_runner_v2 nella riga di comando argomenti. L'esecuzione con Runner v1 provoca l'errore della pipeline con seguente eccezione:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshot

Il connettore non supporta Snapshot di Dataflow.

Duplicati

Il connettore Bigtable Beam esegue il flusso di modifiche per ogni chiave di riga e nell'ordine del timestamp di commit, ma poiché a volte si riavvia precedenti nello stream, può produrre duplicati.

Prima di iniziare

Prima di utilizzare il connettore, completa i seguenti prerequisiti.

Configura l'autenticazione

Per utilizzare gli Java esempi in questa pagina in una località dell'ambiente di sviluppo, installare e inizializzare gcloud CLI quindi configura Credenziali predefinite dell'applicazione con le tue credenziali utente.

  1. Installa Google Cloud CLI.
  2. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  3. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login

Per ulteriori informazioni, vedi Set up authentication for a local development environment.

Per informazioni sulla configurazione dell'autenticazione per un ambiente vedi l'ambiente Set up Application Default Credentials for code running on Google Cloud.

Attivare un flusso di modifiche

Devi abilitare una modifica in tempo reale su una tabella prima di poterla leggere. Puoi anche crea una nuova tabella con le modifiche in tempo reale abilitate.

Tabella dei metadati delle modifiche in tempo reale

Quando trasmetti le modifiche in modalità flusso con Dataflow, Il connettore Bigtable Beam crea una tabella di metadati denominata __change_stream_md_table per impostazione predefinita. La tabella dei metadati delle modifiche in tempo reale gestisce lo stato operativo del connettore e archivia i metadati relativi alle modifiche dei dati record.

Per impostazione predefinita, il connettore crea la tabella nella stessa istanza della tabella viene riprodotta in streaming. Per garantire che la tabella funzioni correttamente, il profilo dell'app per la la tabella di metadati deve utilizzare il routing a cluster singolo e avere una riga singola transazioni abilitate.

Per saperne di più sulle modifiche in modalità flusso da Bigtable con del connettore Bigtable Beam, consulta la sezione BigtableIO documentazione.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere una modifica di Bigtable utilizzando Dataflow, chiedi all'amministratore di concederti i seguenti ruoli IAM.

Per leggere le modifiche da Bigtable, hai bisogno di questo ruolo:

  • Amministratore Bigtable (roles/bigtable.admin) nell'istanza Bigtable che contiene la tabella che prevedi modifiche al flusso da

Per eseguire il job di Dataflow, devi disporre di questi ruoli:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestisci accesso.

Potresti anche riuscire a ottenere le autorizzazioni richieste ruoli personalizzati o altro ruoli predefiniti.

Aggiungere il connettore Bigtable Beam come dipendenza

Aggiungi al file Maven pom.xml un codice simile alla seguente dipendenza. La deve essere 2.48.0 o successiva.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Leggi il flusso di modifiche

Per creare una pipeline Dataflow per leggere i record delle modifiche dei dati, configuri il connettore e aggiungi trasformazioni e sink. Quindi utilizza per leggere ChangeStreamMutation oggetti in una pipeline Beam.

Gli esempi di codice in questa sezione, scritti in Java, mostrano come creare un e usarla per convertire coppie chiave-valore in una stringa. Ogni coppia è composta di una chiave di riga e di un oggetto ChangeStreamMutation. La pipeline converte ogni le voci dell'oggetto in una stringa separata da virgole.

Crea la pipeline

Questo esempio di codice Java mostra come creare la pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Elaborare i record delle modifiche dei dati

Questo esempio mostra come eseguire il loop di tutte le voci in un record delle modifiche ai dati per una riga e richiamare un metodo di conversione in stringa in base al tipo di voce.

Per un elenco dei tipi di voce che possono essere contenuti in un record delle modifiche dei dati, consulta: Contenuto di un record delle modifiche dei dati.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In questo esempio viene convertita una voce write:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In questo esempio viene convertita una voce relativa all'eliminazione di celle:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In questo esempio viene convertita l'eliminazione di una famiglia di colonne:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Monitoraggio

Le seguenti risorse nella console Google Cloud ti consentono di monitorare alle risorse Google Cloud durante l'esecuzione di una pipeline Dataflow legge una modifica in tempo reale di Bigtable:

In particolare, controlla le seguenti metriche:

  • Nella pagina Monitoring di Bigtable, controlla questi metrics:
    • Utilizzo della CPU per flussi di modifiche nella metrica cpu_load_by_app_profile_by_method_by_table. Mostra le variazioni del flusso di modifiche sull'utilizzo della CPU da parte del cluster.
    • Modifica dell'utilizzo dello spazio di archiviazione per le modifiche in tempo reale (byte) (change_stream_log_used_bytes).
  • Nella pagina Monitoraggio di Dataflow, controlla Dati aggiornamento, che mostra la differenza tra l'ora corrente e filigrana. La durata dovrebbe essere di circa due minuti, con picchi occasionali qualche minuto in più. Se la metrica di aggiornamento dei dati è costantemente più elevata oltre questa soglia, è probabile che la tua pipeline non disponga di risorse sufficienti e devono aggiungere altri worker Dataflow. L'aggiornamento dei dati indicano se i record delle modifiche dei dati vengono elaborati lentamente.
  • Dataflow: processing_delay_from_commit_timestamp_MEAN può indicare il tempo medio di elaborazione dei record delle variazioni dei dati nel corso per tutta la durata del job.

La metrica server/latencies di Bigtable non è utile quando di una pipeline Dataflow che legge una pipeline Dataflow Modifiche in tempo reale Bigtable, perché riflettono la richiesta di flussi di dati non la latenza nell'elaborazione del record delle modifiche dei dati. Latenza elevata in un flusso di modifiche non significa che le richieste vengono elaborate lentamente; significa che la connessione sia rimasta aperta per quel tempo.

Passaggi successivi