Sviluppa e testa le pipeline Dataflow

Questa pagina fornisce le best practice per sviluppare e testare la pipeline Dataflow.

Panoramica

Il modo in cui viene implementato il codice della pipeline ha un'influenza significativa sulle prestazioni della pipeline in produzione. Per aiutarti a creare un codice pipeline che funzioni in modo corretto ed efficiente, questo documento spiega quanto segue:

  • runner di pipeline per supportare l'esecuzione di codice nelle diverse fasi di sviluppo e deployment.
  • Ambienti di deployment che consentono di eseguire pipeline durante sviluppo, test, pre-produzione e produzione.
  • Modelli e codice di pipeline open source che puoi utilizzare così come sono o come base per nuove pipeline per accelerare lo sviluppo del codice.
  • Un approccio basato sulle best practice per testare il codice della pipeline. In primo luogo, questo documento fornisce una panoramica che include l'ambito e la relazione di diversi tipi di test, come test delle unità, test di integrazione e test end-to-end. In secondo luogo, ogni tipo di test viene esplorato in dettaglio, compresi i metodi per creare e integrare i dati di test e i runner della pipeline da utilizzare per ogni test.

runner della pipeline

Durante lo sviluppo e il test, utilizzi diversi runner Apache Beam per eseguire il codice della pipeline. L'SDK Apache Beam fornisce un esecutore diretto per lo sviluppo e i test locali. Gli strumenti di automazione delle release possono usare l'esecuzione diretta anche per i test delle unità e di integrazione. Ad esempio, puoi utilizzare l'esecuzione diretta all'interno della tua pipeline di integrazione continua (CI).

Le pipeline di cui viene eseguito il deployment in Dataflow utilizzano Dataflow Runner, che esegue la pipeline in ambienti simili a quelli di produzione. Inoltre, puoi utilizzare Dataflow per i test di sviluppo ad hoc e per i test end-to-end della pipeline.

Sebbene questa pagina sia incentrata sull'esecuzione di pipeline create utilizzando l'SDK Java Apache Beam, Dataflow supporta anche le pipeline Apache Beam sviluppate utilizzando Python e Go. Gli SDK Apache Beam Java, Python e Go sono disponibili a livello generale per Dataflow. Gli sviluppatori SQL possono utilizzare anche Apache Beam SQL per creare pipeline che utilizzano dialetti SQL familiari.

configura un ambiente di deployment

Per separare utenti, dati, codice e altre risorse in diverse fasi di sviluppo, crea ambienti di deployment. Quando possibile, per fornire ambienti isolati per le diverse fasi di sviluppo delle pipeline, utilizza progetti Google Cloud separati.

Le sezioni seguenti descrivono un insieme tipico di ambienti di deployment.

Ambiente locale

L'ambiente locale è la workstation di uno sviluppatore. Per lo sviluppo e i test rapidi, utilizza l'esecuzione diretta per eseguire il codice della pipeline in locale.

Le pipeline eseguite in locale utilizzando l'esecuzione diretta possono interagire con risorse Google Cloud remote, come argomenti Pub/Sub o tabelle BigQuery. Fornisci ai singoli sviluppatori progetti Google Cloud separati in modo da avere una sandbox per test ad hoc con i servizi Google Cloud.

Alcuni servizi Google Cloud, come Pub/Sub e Bigtable, forniscono emulatori per lo sviluppo locale. Puoi usare questi emulatori con Direct Runner per abilitare lo sviluppo e i test locali end-to-end.

Ambiente sandbox

L'ambiente sandbox è un progetto Google Cloud che consente agli sviluppatori di accedere ai servizi Google Cloud durante lo sviluppo del codice. Gli sviluppatori di pipeline possono condividere un progetto Google Cloud con altri sviluppatori o utilizzare i propri progetti individuali. L'utilizzo di singoli progetti riduce la complessità di pianificazione relativa all'utilizzo condiviso delle risorse e alla gestione delle quote.

Gli sviluppatori utilizzano l'ambiente sandbox per eseguire l'esecuzione di pipeline ad hoc con Dataflow Runner. L'ambiente sandbox è utile per il debug e il test del codice rispetto a un runner di produzione durante la fase di sviluppo del codice. Ad esempio, l'esecuzione di pipeline ad hoc consente agli sviluppatori di:

  • Osserva l'effetto delle modifiche al codice sul comportamento di scalabilità.
  • Comprendi le potenziali differenze tra il comportamento dell'esecutore diretto e di Dataflow.
  • Scopri in che modo Dataflow applica le ottimizzazioni dei grafici.

Per i test ad hoc, gli sviluppatori possono eseguire il deployment del codice dall'ambiente locale per eseguire Dataflow nell'ambiente sandbox.

Ambiente di pre-produzione

L'ambiente di preproduzione è destinato alle fasi di sviluppo che devono essere eseguite in condizioni simili alla produzione, come i test end-to-end. Utilizza un progetto separato per l'ambiente di pre-produzione e configuralo in modo che sia il più simile possibile a quello di produzione. Analogamente, per consentire test end-to-end con scalabilità simile alla produzione, imposta le quote dei progetti Google Cloud per Dataflow e altri servizi il più simili possibile all'ambiente di produzione.

A seconda delle tue esigenze, puoi separare ulteriormente la preproduzione in più ambienti. Ad esempio, un ambiente di controllo della qualità può supportare il lavoro degli analisti della qualità per testare gli obiettivi del livello di servizio (SLO), come la correttezza dei dati, l'aggiornamento e le prestazioni in condizioni di carico di lavoro diverse.

I test end-to-end includono l'integrazione con origini dati e sink nell'ambito dei test. Considera come renderli disponibili nell'ambiente di pre-produzione. Puoi archiviare i dati di test nell'ambiente di pre-produzione stesso. Ad esempio, i dati di test vengono archiviati in un bucket Cloud Storage con i tuoi dati di input. In altri casi, i dati di test potrebbero provenire dall'esterno dell'ambiente di pre-produzione, ad esempio un argomento Pub/Sub tramite un abbonamento separato nell'ambiente di produzione. Per le pipeline in modalità flusso, puoi anche eseguire test end-to-end utilizzando dati generati, ad esempio utilizzando il generatore di flussi di dati Dataflow per emulare volumi e caratteristiche di dati di tipo produzione.

Per le pipeline in modalità flusso, utilizza l'ambiente di pre-produzione per testare gli aggiornamenti della pipeline prima di apportare qualsiasi modifica alla produzione. È importante testare e verificare le procedure di aggiornamento per le pipeline in modalità flusso, in particolare se devi coordinare più passaggi, come l'esecuzione di pipeline parallele per evitare tempi di inattività.

Ambiente di produzione

L'ambiente di produzione è un progetto Google Cloud dedicato. La distribuzione continua copia gli artefatti di deployment nell'ambiente di produzione una volta superati tutti i test end-to-end.

Best practice per lo sviluppo

Consulta le best practice per la pipeline Dataflow.

Testare la pipeline

Nello sviluppo del software, i test delle unità, i test di integrazione e i test end-to-end sono tipi comuni di test del software. Questi tipi di test sono applicabili anche alle pipeline di dati.

L'SDK Apache Beam fornisce le funzionalità per abilitare questi test. Idealmente, ogni tipo di test ha come target un ambiente di deployment diverso. Il seguente schema illustra in che modo i test delle unità, i test di integrazione e i test end-to-end vengono applicati a parti diverse della pipeline e dei dati.

I tipi di test e la loro relazione con trasformazioni, pipeline, origini dati e sink di dati.

Il diagramma mostra l'ambito dei diversi test e la loro relazione a trasformazioni (sottoclassi DoFn e PTransform), pipeline, origini dati e sink di dati.

Le seguenti sezioni descrivono in che modo i vari test software formali vengono applicati alle pipeline di dati utilizzando Dataflow. Leggendo questa sezione, fai riferimento al diagramma per capire la correlazione tra i diversi tipi di test.

Campionamento dei dati

Per osservare i dati in ogni passaggio di una pipeline Dataflow, abilita il campionamento dei dati durante il test. Questo ti consente di visualizzare gli output delle trasformazioni, per assicurarti che l'output sia corretto.

Test delle unità

I test delle unità valutano il corretto funzionamento delle sottoclassi DoFn e delle trasformazioni composte (PTransform sottoclassi) confrontando l'output di queste trasformazioni con un insieme verificato di input e output di dati. In genere, gli sviluppatori possono eseguire questi test nell'ambiente locale. I test possono anche essere eseguiti automaticamente tramite l'automazione di test delle unità utilizzando l'integrazione continua (CI) nell'ambiente di build.

L'esecutore diretto esegue test delle unità utilizzando un sottoinsieme più ridotto di dati di test di riferimento incentrati sui test della logica di business delle trasformazioni. I dati di test devono essere sufficientemente piccoli da poter essere inseriti nella memoria locale della macchina che esegue il test.

L'SDK Apache Beam fornisce una regola JUnit denominata TestPipeline per il test delle unità singole trasformazioni (DoFn sottoclassi), trasformazioni composte (PTransform sottoclassi) e intere pipeline. Puoi utilizzare TestPipeline su un runner della pipeline Apache Beam come Direct Runner o Dataflow Runner per applicare asserzioni sui contenuti degli oggetti PCollection utilizzando PAssert, come mostrato nel seguente snippet di codice di una classe di test JUnit:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

Test delle unità per singole trasformazioni

Scomponendo il codice in trasformazioni riutilizzabili, ad esempio come classi nidificate di primo livello o statiche, puoi creare test mirati per diverse parti della pipeline. Oltre ai vantaggi dei test, le trasformazioni riutilizzabili migliorano la manutenibilità e la riutilizzabilità del codice integrando naturalmente la logica di business della tua pipeline in parti componenti. Al contrario, testare singole parti della pipeline potrebbe essere difficile se la pipeline utilizza classi interne anonime per implementare le trasformazioni.

Il seguente snippet Java mostra l'implementazione delle trasformazioni come classi interne anonime, che non consentono di eseguire facilmente i test.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Confronta l'esempio precedente con quello seguente, in cui le classi interne anonime vengono sottoposte a refactoring in sottoclassi di calcestruzzo DoFn denominate. Puoi creare singole unità test per ogni sottoclasse DoFn di cemento che compongono la pipeline end-to-end.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

Il test di ogni sottoclasse DoFn è simile al test delle unità di una pipeline batch che contiene una singola trasformazione. Utilizza la trasformazione Create per creare un oggetto PCollection di dati di test, quindi passalo all'oggetto DoFn. Usa PAssert per dichiarare che i contenuti dell'oggetto PCollection sono corretti. Il seguente esempio di codice Java utilizza la classe PAssert per verificare il modulo di output corretto.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Test di integrazione

I test di integrazione verificano il corretto funzionamento dell'intera pipeline. Prendi in considerazione i seguenti tipi di test di integrazione:

  • Un test di integrazione della trasformazione che valuta la funzionalità integrata di tutte le singole trasformazioni che compongono la tua pipeline di dati. Pensa ai test di integrazione della trasformazione come a un test delle unità per l'intera pipeline, escludendo l'integrazione con origini dati e sink esterni. L'SDK Apache Beam fornisce metodi per fornire dati di test alla pipeline di dati e verificare i risultati dell'elaborazione. L'esecuzione diretta viene utilizzata per eseguire i test di integrazione della trasformazione.
  • Un test di integrazione del sistema che valuta l'integrazione della pipeline di dati con origini dati e sink in tempo reale. Affinché la tua pipeline comunichi con i sistemi esterni, devi configurare i test con le credenziali appropriate per accedere ai servizi esterni. Le pipeline in modalità flusso vengono eseguite a tempo indeterminato, quindi devi decidere quando e come arrestare la pipeline in esecuzione. Utilizzando l'esecutore diretto per eseguire i test di integrazione del sistema, puoi verificare rapidamente l'integrazione tra la tua pipeline e altri sistemi senza dover inviare un job Dataflow e attendere il completamento.

Progetta i test di trasformazione e di integrazione dei sistemi in modo da fornire feedback e rilevamento rapidi dei difetti senza rallentare la produttività degli sviluppatori. Per i test più lunghi, come quelli eseguiti come job Dataflow, potresti voler utilizzare un test end-to-end eseguito con minore frequenza.

Pensa a una pipeline di dati come a una o più trasformazioni correlate. Puoi creare una trasformazione composita incapsulante per la tua pipeline e utilizzare TestPipeline per eseguire un test di integrazione dell'intera pipeline. A seconda che tu voglia testare la pipeline in modalità batch o flusso, puoi fornire i dati di test utilizzando le trasformazioni Create o TestStream.

Utilizzare i dati dei test per i test di integrazione

È probabile che la pipeline si integri con origini dati e sink diversi nel tuo ambiente di produzione. Tuttavia, per i test delle unità e di integrazione della trasformazione, concentrati sulla verifica della logica di business del codice della pipeline fornendo input di test e verificando direttamente l'output. Oltre a semplificare i test, questo approccio consente di isolare i problemi specifici della pipeline da quelli che potrebbero essere causati da origini dati e sink.

Testa pipeline batch

Per le pipeline in modalità batch, utilizza la trasformazione Create per creare un oggetto PCollection dei tuoi dati di test di input da una raccolta in memoria standard, come un oggetto List Java. L'uso della trasformazione Create è appropriato se i dati di test sono sufficientemente piccoli da essere inclusi nel codice. Puoi quindi utilizzare PAssert sugli oggetti PCollection di output per determinare la correttezza del codice della pipeline. Questo approccio è supportato dall'esecutore diretto e dall'esecutore Dataflow.

Il seguente snippet di codice Java mostra le asserzioni rispetto agli oggetti PCollection di output di una trasformazione composita che include alcune o tutte le singole trasformazioni che costituiscono una pipeline (WeatherStatsPipeline). L'approccio è simile al test delle unità di singole trasformazioni in una pipeline.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

Per verificare il comportamento delle finestre, puoi anche utilizzare la trasformazione Create per creare elementi con timestamp, come mostrato nel seguente snippet di codice:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Test delle pipeline in modalità flusso

Le pipeline in modalità flusso contengono ipotesi che definiscono la modalità di gestione dei dati illimitati. Queste ipotesi riguardano spesso la tempestività dei dati in condizioni reali e, di conseguenza, hanno un impatto sulla correttezza a seconda che le ipotesi siano vere o false. I test di integrazione per le pipeline in modalità flusso includono idealmente test che simulano la natura non deterministica dell'arrivo dei flussi di dati.

Per abilitare questi test, l'SDK Apache Beam fornisce la classe TestStream per modellare gli effetti delle sincronizzazione degli elementi (dati in anticipo, puntuali o tardivi) sui risultati della pipeline di dati. Utilizza questi test insieme alla classe PAssert per eseguire la verifica in base ai risultati previsti.

TestStream è supportato dall'esecuzione diretta e da Dataflow. Il seguente esempio di codice crea una trasformazione TestStream:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

Per maggiori informazioni su TestStream, consulta Test di pipeline illimitate in Apache Beam. Per ulteriori informazioni su come utilizzare l'SDK Apache Beam per il test delle unità, consulta la documentazione di Apache Beam.

Utilizzare i servizi Google Cloud nei test di integrazione

L'eseguitore diretto può integrarsi con i servizi Google Cloud, di conseguenza i test ad hoc nell'ambiente locale e i test di integrazione del sistema possono utilizzare Pub/Sub, BigQuery e altri servizi in base alle esigenze. Quando utilizzi l'esecuzione diretta, la pipeline viene eseguita come l'account utente configurato utilizzando lo strumento a riga di comando gcloud o come account di servizio specificato utilizzando la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS. Pertanto, devi concedere autorizzazioni sufficienti a questo account per tutte le risorse richieste prima di eseguire la pipeline. Per maggiori dettagli, consulta Sicurezza e autorizzazioni di Dataflow.

Per test di integrazione interamente locali, puoi utilizzare emulatori locali per alcuni servizi Google Cloud. Sono disponibili emulatori locali per Pub/Sub e Bigtable.

Per i test di integrazione del sistema delle pipeline in modalità flusso, puoi usare il metodo setBlockOnRun (definito nell'interfaccia DirectOptions) per fare in modo che l'esecutore diretto esegua la pipeline in modo asincrono. In caso contrario, l'esecuzione della pipeline blocca il processo padre chiamante (ad esempio, uno script nella pipeline di build) fino a quando la pipeline non viene arrestata manualmente. Se esegui la pipeline in modo asincrono, puoi utilizzare l'istanza PipelineResult restituita per annullare l'esecuzione della pipeline, come mostrato nel seguente esempio di codice:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

Test end-to-end

I test end-to-end verificano il corretto funzionamento della pipeline end-to-end eseguendola su Dataflow Runner in condizioni simili alla produzione. I test verificano che la logica di business funzioni correttamente mediante l'esecuzione di Dataflow e verificano se la pipeline si comporta come previsto nei caricamenti di tipo produzione. In genere, si eseguono test end-to-end in un progetto Google Cloud dedicato progettato come ambiente di pre-produzione.

Per testare la pipeline su scale diverse, utilizza tipi diversi di test end-to-end, ad esempio:

  • Esegui test end-to-end su scala ridotta utilizzando una piccola proporzione (ad esempio l'1%) del set di dati di test per convalidare rapidamente la funzionalità della pipeline nell'ambiente di pre-produzione.
  • Esegui test end-to-end su larga scala utilizzando un set di dati di test completo per convalidare la funzionalità della pipeline in condizioni e volumi di dati simili a quelli di produzione.

Per le pipeline in modalità flusso, ti consigliamo di eseguire pipeline di test in parallelo alla pipeline di produzione se possono utilizzare gli stessi dati. Questo processo consente di confrontare risultati e comportamento operativo, come scalabilità automatica e prestazioni.

I test end-to-end consentono di prevedere in che misura la pipeline soddisferà gli SLO di produzione. L'ambiente di pre-produzione testa la pipeline in condizioni simili a quelle di produzione. All'interno di test end-to-end, le pipeline vengono eseguite utilizzando Dataflow Runner per elaborare set di dati di riferimento completi che corrispondono o assomigliano a set di dati in produzione.

Potrebbe non essere possibile generare dati sintetici per test che simulano accuratamente dati reali. Per risolvere questo problema, un approccio è utilizzare estrazioni pulite dalle origini dati di produzione per creare set di dati di riferimento, in cui tutti i dati sensibili vengono anonimizzati tramite trasformazioni appropriate. A questo scopo, ti consigliamo di utilizzare Sensitive Data Protection. Sensitive Data Protection può rilevare i dati sensibili da una serie di tipi di contenuti e origini dati e applicare una serie di tecniche di anonimizzazione, tra cui oscuramento, mascheramento, crittografia con protezione del formato e cambio di data.

Differenze nei test end-to-end per pipeline in modalità batch e flusso

Prima di eseguire un test end-to-end completo su un set di dati di test di grandi dimensioni, potrebbe essere opportuno eseguire un test con una percentuale inferiore dei dati di test (ad esempio l'uno per cento) e verificare il comportamento previsto in un periodo di tempo più breve. Come per i test di integrazione che utilizzano l'esecuzione diretta, puoi utilizzare PAssert su PCollection oggetti quando esegui pipeline utilizzando l'esecuzione di Dataflow. Per ulteriori informazioni su PAssert, consulta la sezione Test delle unità in questa pagina.

A seconda del caso d'uso, verificare un output di grandi dimensioni da test end-to-end potrebbe non essere attuabile, costoso o comunque impegnativo. In questo caso, puoi verificare i campioni rappresentativi del set di risultati di output. Ad esempio, puoi utilizzare BigQuery per campionare e confrontare le righe di output con un set di dati di riferimento dei risultati previsti.

Per le pipeline in modalità flusso, simulare condizioni di flusso realistiche con dati sintetici potrebbe essere difficile. Un modo comune per fornire dati in modalità flusso per i test end-to-end è integrare i test con le origini dati di produzione. Se utilizzi Pub/Sub come origine dati, puoi attivare uno stream di dati separato per i test end-to-end tramite sottoscrizioni aggiuntive ad argomenti esistenti. Puoi quindi confrontare i risultati di pipeline diverse che consumano gli stessi dati. Questa funzionalità è utile per verificare le pipeline candidati rispetto ad altre pipeline di pre-produzione e produzione.

Il seguente diagramma mostra come questo metodo consente l'esecuzione in parallelo di una pipeline di produzione e di test in diversi ambienti di deployment.

Esecuzione di una pipeline di test in parallelo con una pipeline di produzione utilizzando una singola origine di inserimento di flussi Pub/Sub.

Nel diagramma, entrambe le pipeline leggono dallo stesso argomento Pub/Sub, ma usano sottoscrizioni separate. Questa configurazione consente alle due pipeline di elaborare gli stessi dati in modo indipendente e di confrontare i risultati. La pipeline di test utilizza un account di servizio separato da quello del progetto di produzione, quindi evita di utilizzare la quota di sottoscrittore Pub/Sub per il progetto di produzione.

A differenza delle pipeline in modalità batch, le pipeline in modalità flusso continuano a essere eseguite fino a quando non vengono esplicitamente annullate. Nei test end-to-end, devi decidere se lasciare in esecuzione la pipeline, magari fino all'esecuzione del successivo test end-to-end, o annullare la pipeline in un punto che rappresenti il completamento del test, in modo da poter esaminare i risultati.

Il tipo di dati di test che utilizzi influisce su questa decisione. Ad esempio, se utilizzi un insieme limitato di dati di test fornito alla pipeline in modalità flusso, potresti annullare la pipeline al termine dell'elaborazione per tutti gli elementi. In alternativa, se utilizzi un'origine dati reale, ad esempio un argomento Pub/Sub esistente utilizzato in produzione, o se generi i dati di test continuamente, ti consigliamo di mantenere le pipeline di test in esecuzione per un periodo più lungo. Quest'ultimo consente di confrontare il comportamento con l'ambiente di produzione o persino con altre pipeline di test.