Best practice per flussi di lavoro altamente paralleli

Questa pagina fornisce indicazioni sulle best practice da seguire per la creazione e l'esecuzione di flussi di lavoro HPC altamente paralleli di Dataflow, tra cui come utilizzare il codice esterno nelle pipeline e come gestire la gestione degli errori.

Includi codice esterno nella pipeline

Un elemento distintivo fondamentale per le pipeline altamente parallele è che utilizzano il codice C++ all'interno di DoFn anziché uno dei linguaggi standard dell'SDK Apache Beam. Per le pipeline Java, per semplificare l'utilizzo delle librerie C++ nella pipeline, è consigliabile utilizzare chiamate di procedure esterne. Questa sezione descrive l'approccio generale utilizzato per l'esecuzione di codice esterno (C++) nelle pipeline Java.

Una definizione di pipeline Apache Beam ha diversi componenti chiave:

  • PCollections sono raccolte immutabili di elementi omogenei.
  • PTransforms sono utilizzati per definire le trasformazioni in un PCollection che genera un altro PCollection.
  • La pipeline è il costrutto che consente, tramite il codice, di dichiarare le interazioni tra PTransforms e PCollections. La pipeline è rappresentata come un grafico aciclico diretto (DAG, Directed Acyclic Graph).

Quando utilizzi il codice da un linguaggio che non fa parte dei linguaggi standard dell'SDK Apache Beam, inseriscilo nella PTransform, che si trova all'interno di DoFn, e utilizza uno dei linguaggi SDK standard per definire la pipeline stessa. Ti consigliamo di utilizzare l'SDK Apache Beam Python per definire la pipeline, perché l'SDK Python ha una classe di utilità che rende più semplice l'utilizzo di altro codice. Tuttavia, puoi utilizzare gli altri SDK Apache Beam.

Puoi utilizzare il codice per condurre esperimenti rapidi senza richiedere una build completa. Nel caso di un sistema di produzione, in genere crei i tuoi programmi binari, il che ti offre la libertà di ottimizzare il processo in base alle tue esigenze.

Il seguente diagramma illustra i due utilizzi dei dati della pipeline:

  • I dati vengono utilizzati per guidare il processo.
  • I dati vengono acquisiti durante l'elaborazione e uniti ai dati dei conducenti.

Due fasi di dati della pipeline

In questa pagina, i dati primari (dall'origine) sono indicati come dati di trasferimento, mentre i dati secondari (dalla fase di elaborazione) sono indicati come unione dei dati.

In un caso d'uso finanziario, i dati sulla guida potrebbero essere di alcune centinaia di migliaia di operazioni. Ogni transazione deve essere elaborata in base ai dati di mercato. In questo caso, i dati di mercato sono i dati di unione. In un caso d'uso dei media, i dati di guida potrebbero essere file immagine che richiedono l'elaborazione, ma non altre origini dati e, pertanto, non utilizzano l'unione dei dati.

Considerazioni sulle dimensioni dei dati di guida

Se le dimensioni dell'elemento di dati di guida rientrano nell'intervallo di meno megabyte, considerale con il normale paradigma Apache Beam che prevede la creazione di un oggetto PCollection dall'origine e l'invio dell'oggetto alle trasformazioni Apache Beam per l'elaborazione.

Se le dimensioni dell'elemento di dati di guida sono elevate in megabyte o gigabyte, come di consueto per i contenuti multimediali, puoi inserire i dati di guida in Cloud Storage. Quindi, nell'oggetto PCollection iniziale, fai riferimento all'URI di archiviazione e solo un riferimento URI ai dati utilizzati.

Considerazioni sulle dimensioni per l'unione dei dati

Se i dati di unione sono di alcune centinaia di megabyte o meno, utilizza un input laterale per trasferirli nelle trasformazioni di Apache Beam. L'input secondario invia il pacchetto di dati a ogni worker che ne ha bisogno.

Se i dati di unione sono compresi nell'intervallo di gigabyte o terabyte, utilizza Bigtable o Cloud Storage per unire i dati di unione a quelli di trasmissione, a seconda della natura dei dati. Bigtable è ideale per scenari finanziari in cui ai dati di mercato si accede spesso sotto forma di ricerche di coppie chiave-valore da Bigtable. Per ulteriori informazioni sulla progettazione dello schema di Bigtable, inclusi i suggerimenti per l'utilizzo dei dati delle serie temporali, consulta la seguente documentazione di Bigtable:

Esegui il codice esterno

Puoi eseguire codice esterno in Apache Beam in molti modi.

  • Creare un processo chiamato da un oggetto DoFn all'interno di una trasformazione Dataflow.

  • Utilizza JNI con l'SDK Java.

  • Crea un processo secondario direttamente dall'oggetto DoFn. Sebbene questo approccio non sia il più efficiente, è robusto e semplice da implementare. A causa dei potenziali problemi relativi all'utilizzo di JNI, questa pagina illustra l'utilizzo di una chiamata di sottoprocesso.

Durante la progettazione del flusso di lavoro, considera l'intera pipeline end-to-end. Eventuali inefficienze nell'esecuzione del processo sono compensate dal fatto che lo spostamento dei dati dall'origine al sink avviene con una singola pipeline. Se confronti questo approccio con altri, esamina i tempi end-to-end della pipeline e i costi end-to-end.

Esegui il pull dei programmi binari negli host

Quando utilizzi un linguaggio Apache Beam nativo, l'SDK Apache Beam trasferisce automaticamente tutto il codice necessario ai worker. Tuttavia, quando effettui una chiamata a un codice esterno, devi spostarlo manualmente.

File binari archiviati nei bucket

Per spostare il codice: L'esempio illustra i passaggi per l'SDK Java Apache Beam.

  1. Archivia il codice esterno compilato, insieme alle informazioni sul controllo delle versioni, in Cloud Storage.
  2. Nel metodo @Setup, crea un blocco sincronizzato per verificare se il file di codice è disponibile nella risorsa locale. Anziché implementare un controllo fisico, puoi confermare la disponibilità utilizzando una variabile statica al termine del primo thread.
  3. Se il file non è disponibile, utilizza la libreria client di Cloud Storage per eseguire il pull del file dal bucket Cloud Storage al worker locale. Un approccio consigliato è utilizzare la classe Apache Beam FileSystems per questa attività.
  4. Dopo lo spostamento del file, verifica che il bit di esecuzione sia impostato sul file di codice.
  5. In un sistema di produzione, controlla l'hash dei programmi binari per assicurarti che il file sia stato copiato correttamente.

Anche l'utilizzo della funzione Apache Beam filesToStage è un'opzione, ma elimina alcuni dei vantaggi della capacità del runner di pacchettizzare e spostare automaticamente il codice Java. Inoltre, poiché la chiamata al sottoprocesso richiede una posizione del file assoluta, devi utilizzare il codice per determinare il percorso della classe e, di conseguenza, la posizione del file spostato da filesToStage. Sconsigliamo di utilizzare questo approccio.

Esegui i file binari esterni

Prima di poter eseguire codice esterno, devi creare un wrapper. Scrivi questo wrapper nello stesso linguaggio del codice esterno (ad esempio C++) o come script shell. Il wrapper consente di trasmettere handle dei file e implementare le ottimizzazioni come descritto nella sezione Progettazione dell'elaborazione per cicli di CPU ridotti in questa pagina. Il wrapper non deve necessariamente essere sofisticato. Lo snippet seguente mostra il contorno di un wrapper in C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Questo codice legge due parametri dall'elenco di argomenti. Il primo parametro è la posizione del file restituito in cui vengono inviati i dati. Il secondo parametro è costituito dai dati che il codice mostra all'utente. Nelle implementazioni reali, questo codice andrebbe oltre l'eco "Hello, world".

Dopo aver scritto il codice wrapper, esegui il codice esterno nel seguente modo:

  1. Trasmettere i dati ai file binari del codice esterno.
  2. Esegui i programmi binari, individua eventuali errori, registra errori e risultati.
  3. Gestisci le informazioni di logging.
  4. Acquisisci i dati dal completamento dell'elaborazione.

Trasmettere i dati ai programmi binari

Per avviare il processo di esecuzione della libreria, trasmetti i dati al codice C++. In questo passaggio puoi sfruttare l'integrazione di Dataflow con altri strumenti di Google Cloud. Uno strumento come Bigtable è in grado di gestire set di dati molto grandi e di gestire l'accesso a bassa latenza e alta contemporaneità, il che consente a migliaia di core di accedere simultaneamente al set di dati. Inoltre, Bigtable può pre-elaborare i dati, consentendone la modellazione, l'arricchimento e l'applicazione di filtri. Tutto questo può essere eseguito nelle transforms di Apache Beam prima di eseguire il codice esterno.

Per un sistema di produzione, il percorso consigliato consiste nell'utilizzare un buffer di protocollo per incapsulare i dati di input. Puoi convertire i dati di input in byte e codificarli in base 64 prima di passarli alla libreria esterna. I due modi per passare questi dati alla libreria esterna sono i seguenti:

  • Dati di input ridotti. Per dati di piccole dimensioni che non superano la lunghezza massima del sistema per un argomento del comando, passa l'argomento nella posizione 2 del processo creato con java.lang.ProcessBuilder.
  • Dati di input di grandi dimensioni. Per dimensioni di dati più grandi, crea un file il cui nome includa un UUID per contenere i dati richiesti dal processo.

Esegui il codice C++, rileva gli errori e registra

L'acquisizione e la gestione delle informazioni sugli errori è una parte fondamentale della pipeline. Le risorse utilizzate dall'esecutore di Dataflow sono temporanee e spesso è difficile esaminare i file di log dei worker. Devi assicurarti di acquisire e inviare tutte le informazioni utili al logging esecutivo di Dataflow e di archiviare i dati di logging in uno o più bucket Cloud Storage.

L'approccio consigliato è reindirizzare stdout e stderr ai file, in modo da evitare considerazioni relative all'esaurimento della memoria. Ad esempio, nell'esecutore Dataflow che chiama il codice C++, potresti includere righe come la seguente:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Gestire le informazioni di logging

Molti casi d'uso prevedono l'elaborazione di milioni di elementi. Un'elaborazione corretta genera log con valore scarso o nullo, quindi devi prendere una decisione aziendale sulla conservazione dei dati di log. Prendi in considerazione, ad esempio, queste alternative alla conservazione di tutti i dati dei log:

  • Se le informazioni contenute nei log provenienti da un'elaborazione corretta degli elementi non sono importanti, non conservarle.
  • Crea una logica che campiona i dati di log, ad esempio campionando solo ogni 10.000 voci di log. Se l'elaborazione è omogenea, ad esempio quando molte iterazioni del codice generano dati di log sostanzialmente identici, questo approccio fornisce un equilibrio efficace tra la conservazione dei dati di log e l'ottimizzazione dell'elaborazione.

In condizioni di errore, la quantità di dati scaricati nei log potrebbe essere elevata. Una strategia efficace per la gestione di grandi quantità di dati dei log degli errori consiste nel leggere le prime righe della voce di log ed eseguirne il push su Cloud Logging. Puoi caricare il resto del file di log nei bucket Cloud Storage. Questo approccio consente di esaminare le prime righe dei log degli errori in un secondo momento e, se necessario, fare riferimento a Cloud Storage per l'intero file.

È utile anche controllare le dimensioni del file di log. Se la dimensione del file è zero, puoi tranquillamente ignorarla o registrare un semplice messaggio di log indicante che il file non conteneva dati.

Acquisisci i dati dal completamento dell'elaborazione

Non è consigliabile utilizzare stdout per ritrasmettere il risultato del calcolo alla funzione DoFn. Anche altro codice chiamato dal codice C++, ed anche il tuo codice, potrebbero inviare messaggi a stdout, inquinando il flusso stdoutput che altrimenti contiene i dati di logging. È consigliabile, invece, apportare una modifica al codice del wrapper C++ per consentire al codice di accettare un parametro che indica dove creare il file in cui è archiviato il valore. Idealmente, questo file dovrebbe essere archiviato in modo indipendente dal linguaggio utilizzando i buffer di protocollo, che consentono al codice C++ di passare un oggetto al codice Java o Python. L'oggetto DoFn può leggere il risultato direttamente dal file e passare le informazioni sui risultati alla propria chiamata output.

L'esperienza ha dimostrato l'importanza di eseguire test delle unità con il processo stesso. È importante implementare un test delle unità che esegua il processo indipendentemente dalla pipeline Dataflow. Il debug della libreria può essere eseguito in modo molto più efficiente se è indipendente e non deve eseguire l'intera pipeline.

Elaborazione della progettazione per cicli di CPU ridotti

La chiamata a un processo secondario ha un overhead. A seconda del carico di lavoro, potrebbe essere necessario eseguire ulteriori operazioni per ridurre il rapporto tra il lavoro in corso e il sovraccarico amministrativo dovuto all'avvio e all'arresto del processo.

Nel caso d'uso multimediale, le dimensioni dell'elemento dei dati di guida potrebbero essere espressa in megabyte o gigabyte. Di conseguenza, l'elaborazione di ogni elemento di dati può richiedere molti minuti. In questo caso, il costo della chiamata al processo secondario non è significativo rispetto al tempo di elaborazione complessivo. L'approccio migliore in questa situazione è che ogni singolo elemento avvii il proprio processo.

Tuttavia, in altri casi d'uso, come quello finanziario, l'elaborazione richiede unità molto piccole di tempo di CPU (decine di millisecondi). In tal caso, l'overhead associato alla chiamata del sottoprocesso è sproporzionatamente elevato. Una soluzione a questo problema è utilizzare la trasformazione GroupByKey di Apache Beam per creare batch compresi tra 50 e 100 elementi da inserire nel processo. Ad esempio, puoi procedere nel seguente modo:

  • In una funzione DoFn, crea una coppia chiave-valore. Se stai elaborando transazioni finanziarie, puoi utilizzare il numero commerciale come chiave. Se non hai un numero univoco da utilizzare come chiave, puoi generare un checksum dai dati e utilizzare una funzione modulo per creare partizioni di 50 elementi.
  • Invia la chiave a una funzione GroupByKey.create, che restituisce una raccolta KV<key,Iterable<data>> contenente i 50 elementi che puoi inviare al processo.

Limita il parallelismo dei worker

Quando lavori con un linguaggio supportato in modo nativo nell'esecuzione di Dataflow, non devi mai pensare a cosa sta succedendo al worker. Dataflow ha molti processi che supervisionano il controllo del flusso e i thread in modalità batch o flusso.

Tuttavia, se utilizzi un linguaggio esterno come C++, tieni presente che stai facendo qualcosa di anomalo avviando dei processi secondari. In modalità batch, l'esecutore di Dataflow utilizza un rapporto ridotto di thread di lavoro e CPU rispetto alla modalità di elaborazione in modalità flusso. Si consiglia di creare un semaforo all'interno della classe, soprattutto in modalità flusso di dati, per controllare in modo più diretto il parallelismo di un singolo worker.

Ad esempio, con l'elaborazione dei contenuti multimediali, potresti non volere che centinaia di elementi di transcodifica vengano elaborati in parallelo da un singolo worker. In casi come questi, puoi creare una classe di utilità che fornisca i permessi alla funzione DoFn per il lavoro da eseguire. L'uso di questa classe consente di assumere il controllo diretto dei thread worker all'interno della pipeline.

Utilizza data sink ad alta capacità in Google Cloud

Una volta elaborati, i dati vengono inviati a un data sink. Il sink deve essere in grado di gestire il volume dei risultati creati dalla soluzione di elaborazione della griglia.

Il seguente diagramma mostra alcuni dei sink disponibili in Google Cloud quando Dataflow esegue un carico di lavoro della griglia.

Sink disponibili in Google Cloud

Bigtable, BigQuery e Pub/Sub possono gestire flussi di dati molto grandi. Ad esempio, ogni nodo Bigtable può gestire 10.000 inserti al secondo di dimensioni fino a 1000 con una facile scalabilità orizzontale. Di conseguenza, un cluster Bigtable con 100 nodi può assorbire 1.000.000 di messaggi al secondo generati dalla griglia Dataflow.

Gestisci segfault

Quando utilizzi il codice C++ all'interno di una pipeline, devi decidere come gestire i segnali Segfault, perché hanno ramificazioni non locali se non gestite correttamente. L'esecutore di Dataflow crea i processi in base alle esigenze in Java, Python o Go, quindi assegna il lavoro ai processi sotto forma di bundle.

Se la chiamata al codice C++ viene eseguita utilizzando strumenti strettamente accoppiati, come JNI o Cython, e i segfault del processo C++, anche il processo di chiamata e la JVM (Java Virtual Machine) si arresta in modo anomalo. In questo scenario non è possibile individuare i punti dati errati. Per rendere i punti dati errati che possono essere memorizzati nella cache, utilizza un accoppiamento più basso, che dirama i dati non validi e consente il proseguimento della pipeline. Tuttavia, con un codice C++ maturo testato completamente su tutte le varianti dei dati, puoi usare meccanismi come Cython.

Passaggi successivi