Best practice per flussi di lavoro altamente paralleli

Questa pagina fornisce indicazioni sulle best practice da seguire per creare ed eseguire flussi di lavoro HPC Dataflow altamente paralleli, tra cui come utilizzare il codice esterno nelle pipeline, come eseguire la pipeline e come gestire la gestione degli errori.

Includere codice esterno nella pipeline

Un elemento chiave di differenziazione per le pipeline altamente parallele è che usano il codice C++ all'interno DoFn piuttosto che uno degli SDK Apache Beam standard lingue diverse. Per le pipeline Java, per semplificare l'utilizzo delle librerie C++ nella pipeline, è consigliabile utilizzare chiamate di procedure esterne. Questa sezione descrive l'approccio generale utilizzato per eseguire codice esterno (C++) nelle pipeline Java.

Una definizione di pipeline Apache Beam ha diversi componenti chiave:

  • PCollections sono raccolte immutabili di elementi omogenei.
  • PTransforms vengono utilizzati per definire le trasformazioni di un PCollection che genera un altro PCollection.
  • La pipeline è il costrutto che consente, tramite il codice, di dichiarare interazioni tra PTransforms e PCollections. La pipeline è rappresentata come un grafo diretto aciclico (DAG).

Quando utilizzi codice di un linguaggio che non fa parte dell'SDK Apache Beam standard linguaggi, inserisci il codice in PTransform, che si trova all'interno di DoFn, e usare uno dei linguaggi SDK standard per definire la pipeline stessa. Consigliamo di utilizzare l'SDK Apache Beam Python per definire la pipeline, perché l'SDK Python ha una classe di utilità che utilizza un altro codice più semplice. Tuttavia, puoi utilizzare gli altri SDK Apache Beam.

Puoi utilizzare il codice per eseguire esperimenti rapidi senza richiedere una compilazione completa. Per un sistema di produzione, di solito crei i tuoi file binari, e ti offre la libertà di adattare il processo alle tue esigenze.

Il seguente diagramma illustra i due utilizzi dei dati della pipeline:

  • I dati vengono utilizzati per guidare il processo.
  • I dati vengono acquisiti durante l'elaborazione e uniti ai dati del conducente.

Due fasi dei dati della pipeline

In questa pagina, i dati primari (provenienti dalla fonte) sono indicati come dati relativi alla guida, a cui si riferiscono i dati secondari (dalla fase di elaborazione) in unione dei dati.

In un caso d'uso finanziario, i dati di base potrebbero essere di diverse centinaia di migliaia di operazioni. Ogni transazione deve essere elaborata insieme ai dati di mercato. In questo caso, i dati di mercato sono i dati uniti. In un caso d'uso multimediale, i dati di guida potrebbero essere file immagine che richiedono l'elaborazione, ma non altre origini dati pertanto non è necessario utilizzare i dati uniti.

Considerazioni sulle dimensioni dei dati relativi alla guida

Se le dimensioni dell'elemento di dati principali rientrano nell'intervallo di pochi megabyte, gestiscilo con il normale paradigma Apache Beam di creazione di un oggetto PCollection dall'origine e invio dell'oggetto alle trasformazioni Apache Beam per l'elaborazione.

Se le dimensioni dell'elemento dati di guida sono espresse in megabyte o nei gigabyte, come nel caso dei media, puoi inserire i dati di archiviazione ideale in Cloud Storage. Poi, nell'oggetto PCollection iniziale, fai riferimento alla l'URI di archiviazione e solo un riferimento URI ai dati utilizzati.

Considerazioni sulle dimensioni per l'unione dei dati

Se i dati di unione sono pari o inferiori a poche centinaia di megabyte, utilizza un input secondario per inviarli alle trasformazioni Apache Beam. L'input laterale invia il pacchetto di dati a ogni worker che ne ha bisogno.

Se i dati di unione rientrano nell'intervallo di gigabyte o terabyte, utilizza Bigtable o Cloud Storage per unire i dati di unione ai dati di guida, a seconda della natura dei dati. Bigtable è ideale per scenari finanziari in cui spesso si accede ai dati di mercato come coppie chiave-valore da Bigtable. Per ulteriori informazioni sulla progettazione Schema di Bigtable, inclusi suggerimenti per l'utilizzo dei dati delle serie temporali, consulta la seguente documentazione di Bigtable:

Esegui il codice esterno

Puoi eseguire codice esterno in Apache Beam in molti modi.

  • Crea un processo chiamato da un oggetto DoFn all'interno di un la trasformazione Dataflow.

  • Utilizza JNI con l'SDK Java.

  • Crea un processo secondario direttamente dall'oggetto DoFn. Sebbene questo approccio sia non è la più efficiente, è robusta e semplice da implementare. A causa dei potenziali problemi con l'uso di JNI, questa pagina mostra l'utilizzo di una chiamata a un subprocess.

Durante la progettazione del flusso di lavoro, considera la pipeline end-to-end completa. Qualsiasi inefficienze nell'esecuzione del processo sono compensate dal fatto che il trasferimento dei dati dall'origine al sink avviene tramite una una singola pipeline. Se confronti questo approccio con altri, considera i tempi end-to-end della pipeline e i costi end-to-end.

Estrai i binari negli host

Quando utilizzi un linguaggio Apache Beam nativo, l'SDK Apache Beam sposta automaticamente tutto il codice richiesto nei worker. Tuttavia, quando effettui una chiamata a codice esterno, devi spostare il codice manualmente.

File binari archiviati nei bucket

Per spostare il codice: L'esempio mostra i passaggi per l'SDK Apache Beam Java.

  1. Archivia il codice esterno compilato, insieme alle informazioni sul controllo delle versioni, in Cloud Storage.
  2. Nel metodo @Setup, crea un blocco sincronizzato per verificare se il file di codice è disponibile nella risorsa locale. Anziché implementare un controllo fisico, puoi confermare la disponibilità utilizzando una variabile statica al termine del primo thread.
  3. Se il file non è disponibile, utilizza la libreria client di Cloud Storage per eseguire il pull del file dal bucket Cloud Storage al worker locale. R l'approccio consigliato consiste nell'utilizzare Apache Beam FileSystems per questa attività.
  4. Dopo aver spostato il file, verifica che il bit di esecuzione sia impostato sul file di codice.
  5. In un sistema di produzione, controlla l'hash dei file binari per assicurarti che il file sia stato copiato correttamente.

Anche l'utilizzo della funzione Apache Beam filesToStage è un'opzione, ma rimuove alcuni dei vantaggi della capacità del runner di pacchettizzare e spostare automaticamente il codice Java. Inoltre, poiché la chiamata al processo secondario richiede una posizione del file assoluta, devi utilizzare il codice per determinare il percorso della classe e quindi la posizione del file spostato da filesToStage. Sconsigliamo di usare questo approccio.

Esegui i programmi binari esterni

Prima di poter eseguire il codice esterno, devi creare un wrapper. Scrivi questo wrapper nello stesso linguaggio del codice esterno (ad esempio, C++) o come uno script shell. Il wrapper ti consente di passare handle dei file e implementare ottimizzazioni come descritto nella sezione Elaborare il design per cicli CPU ridotti di questa pagina. Il wrapper non deve necessariamente essere sofisticato. Il seguente snippet mostra la struttura di un wrapper in C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Questo codice legge due parametri dall'elenco di argomenti. Il primo parametro è la posizione del file di ritorno in cui viene eseguito il push dei dati. Il secondo sono i dati sottoposti a eco dal codice all'utente. Nelle implementazioni reali, questo codice farebbe molto di più che stampare "Hello, world".

Dopo aver scritto il codice wrapper, esegui il codice esterno nel seguente modo:

  1. Trasmettere i dati ai file binari di codice esterni.
  2. Esegui i programmi binari, rileva eventuali errori e registra errori e risultati.
  3. Gestire le informazioni di logging.
  4. Acquisisci i dati dall'elaborazione completata.

Trasmettere i dati ai file binari

Per avviare il processo di esecuzione della libreria, trasmetti i dati al codice C++. In questo passaggio puoi sfruttare l'integrazione di Dataflow con altri strumenti Google Cloud. Uno strumento come Bigtable può gestire set di dati e gestire l'accesso a bassa latenza e ad alta contemporaneità, per accedere contemporaneamente al set di dati. Inoltre, Bigtable può pre-elaborare i dati, consentendo la loro definizione, l'arricchimento e il filtraggio. Tutto questo lavoro può essere svolto nelle trasformazioni di Apache Beam prima di eseguire il codice esterno.

Per un sistema di produzione, il percorso consigliato è utilizzare un buffer di protocollo per incapsulare i dati di input. Puoi convertire i dati di input in byte la codifica in base64 prima di passarla alla libreria esterna. I due per passare questi dati alla libreria esterna:

  • Piccoli dati di input. Per i dati di piccole dimensioni che non superano la lunghezza massima del sistema per un argomento del comando, passa l'argomento nella posizione 2 della procedura in fase di creazione con java.lang.ProcessBuilder.
  • Dati di input di grandi dimensioni. Per dimensioni dei dati più grandi, crea un file il cui nome include un UUID per contenere i dati richiesti dalla procedura.

Esegui il codice C++, individuazione degli errori e logging

La cattura e la gestione delle informazioni sugli errori è una parte fondamentale della pipeline. Le risorse utilizzate dal runner Dataflow sono temporanee e spesso difficili da esaminare i file di log dei worker. Devi assicurarti di acquisire e inviare tutte le informazioni utili al logging del runner Dataflow e di archiviare i dati di logging in uno o più bucket Cloud Storage.

L'approccio consigliato consiste nel reindirizzare stdout e stderr ai file, consente di evitare considerazioni relative all'esaurimento della memoria. Ad esempio, nel programma di esecuzione di Dataflow che chiama il codice C++, puoi includere righe come la seguente:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Gestire le informazioni di logging

Molti casi d'uso prevedono l'elaborazione di milioni di elementi. Elaborazione riuscita genera log con poco o nessun valore, quindi devi prendere una decisione aziendale sulla conservazione dei dati di log. Ad esempio, considera queste alternative conservando tutti i dati di log:

  • Se le informazioni contenute nei log sono state elaborate correttamente con un elemento non è importante, non tenerlo.
  • Crea una logica che esegue il campionamento dei dati dei log, ad esempio il campionamento ogni 10.000 voci di log. Se l'elaborazione è omogenea, ad esempio quando molte iterazioni del codice generano dati di log essenzialmente identici, questo approccio offre un equilibrio efficace tra il mantenimento dei dati di log e l'ottimizzazione dell'elaborazione.

In condizioni di errore, la quantità di dati di cui viene eseguito il dump nei log potrebbe essere elevata. Una strategia efficace per gestire grandi quantità di dati dei log di errore è leggere le prime righe della voce del log e inviare solo queste righe a Cloud Logging. Puoi caricare il resto del file di log nei bucket Cloud Storage. Questo approccio ti consente di esaminare le prime righe dei log degli errori in un secondo momento e poi, se necessario, fai riferimento a Cloud Storage per l'intero file.

È utile anche controllare le dimensioni del file di log. Se le dimensioni del file sono pari a zero, puoi ignorarlo tranquillamente o registrare un semplice messaggio di log che indica che il file non conteneva dati.

Acquisisci dati dall'elaborazione completata

Non è consigliabile utilizzare stdout per passare il risultato del calcolo alla funzione DoFn. Anche altro codice chiamato dal codice C++ e persino il tuo codice potrebbe inviare messaggi a stdout, contaminando lo stream stdoutput che altrimenti contiene i dati di logging. È invece consigliabile una modifica al codice del wrapper C++ per consentire al codice di accettare un parametro che indica dove creare il file in cui è archiviato il valore. L'ideale è che questo deve essere archiviato in un modo indipendente dal linguaggio buffer di protocollo, che permette al codice C++ di ritrasmettere un oggetto al codice Java o Python. L'oggetto DoFn può leggere il risultato direttamente dal file e passare le informazioni sul risultato alla propria chiamata output.

L'esperienza ha dimostrato quanto sia importante correre i test delle unità che si occupano il processo stesso. È importante implementare un test delle unità che esegua in modo indipendente dalla pipeline Dataflow. Il debugging della biblioteca può essere eseguito in modo molto più efficiente se è autonoma e non deve eseguire l'intera pipeline.

Progetta l'elaborazione per cicli CPU brevi

La chiamata di un sottoprocesso comporta un overhead. A seconda del carico di lavoro, potresti dover svolgere un lavoro extra per ridurre il rapporto tra il lavoro svolto e il carico amministrativo di avvio e arresto del processo.

Nel caso d'uso dei media, le dimensioni dell'elemento di dati di guida potrebbero essere molto elevate, in megabyte o gigabyte. Di conseguenza, l'elaborazione di ogni elemento di dato può richiedere molti minuti. In questo caso, il costo della chiamata al processo secondario è irrilevante rispetto a il tempo di elaborazione complessivo. L'approccio migliore in questa situazione è fare in modo che un singolo elemento avvii il proprio processo.

Tuttavia, in altri casi d'uso, come la finanza, l'elaborazione richiede molta piccole unità di tempo di CPU (decine di millisecondi). In questo caso, il sovraccarico di chiamata del sottoprocesso è sproporzionatamente elevato. Una soluzione è utilizzare Apache Beam GroupByKey per creare lotti di 50-100 elementi da inserire e il processo di sviluppo. Ad esempio, puoi procedere nel seguente modo:

  • In un DoFn , crea una coppia chiave-valore. Se stai elaborando operazioni finanziarie, puoi utilizzare il numero di transazione come chiave. Se non hai un numero unico da utilizzare come chiave, puoi generare un checksum dai dati e utilizzare una funzione modulo per creare partizioni di 50 elementi.
  • Invia la chiave a una funzione GroupByKey.create, che restituisce una raccolta KV<key,Iterable<data>> contenente i 50 elementi che puoi inviare al processo.

Limita parallelismo dei worker

Quando lavori con una lingua supportata in modo nativo in l'esecuzione di Dataflow, non devi mai preoccuparti può accadere al worker. Dataflow ha molti processi che supervisionano controllo del flusso e thread in modalità flusso o batch.

Tuttavia, se utilizzi un linguaggio esterno come C++, tieni presente che stai facendo qualcosa di insolito avviando dei processi secondari. In modalità batch, il programma di esecuzione Dataflow utilizza un rapporto ridotto tra thread di lavoro e CPU rispetto alla modalità di streaming. È consigliabile, soprattutto in modalità di streaming, creare un semaforo all'interno della classe per controllare in modo più diretto il parallelismo di un singolo utente.

Ad esempio, con l'elaborazione multimediale, potresti non volere che centinaia di elementi di transcodifica vengano elaborati in parallelo da un singolo worker. In questi casi, puoi creare una classe di utilità che fornisca i permessi alla funzione DoFn per il lavoro da svolgere. L'utilizzo di questa classe ti consente di assumere il controllo diretto dei thread worker all'interno della pipeline.

Utilizza data sink ad alta capacità in Google Cloud

Una volta elaborati, i dati vengono inviati a un data sink. Il sink deve essere in grado di gestire il volume dei risultati creati dall'elaborazione della griglia soluzione.

Il seguente diagramma mostra alcuni dei sink disponibili in Google Cloud quando Dataflow sta eseguendo un carico di lavoro della griglia.

Destinazioni disponibili in Google Cloud

Bigtable, BigQuery e Pub/Sub possono gestire flussi di dati molto grandi. Ad esempio, ogni nodo Bigtable può gestire 10.000 inserimenti al secondo di dimensioni fino a 1 KB con una facile scalabilità orizzontale. Di conseguenza, un cluster Bigtable da 100 nodi può assorbire 1.000.000 di messaggi al secondo generati Griglia Dataflow.

Gestire gli errori di segfault

Quando utilizzi il codice C++ all'interno di una pipeline, devi decidere come gestire segfault, perché hanno ramificazioni non locali se non trattate correttamente. Il programma di esecuzione Dataflow crea i processi in base alle esigenze in Java, Python o Go, quindi assegna il lavoro ai processi sotto forma di bundle.

Se la chiamata al codice C++ viene eseguita utilizzando strumenti strettamente associati, come JNI o Cython e i segfault dei processi C++, il processo di chiamata e Java Virtual Anche la macchina (JVM) ha un arresto anomalo. In questo scenario, i punti dati errati non sono rilevabili. Per creare punti dati errati recuperabili, utilizzare un accoppiamento più ampio, che dirama i dati non validi e consente alla pipeline di continuare. Tuttavia, con un codice C++ maturo che completamente testati rispetto a tutte le varianti dei dati, puoi utilizzare meccanismi come Cython.

Passaggi successivi