Best Practices für stark parallele Workflows

Auf dieser Seite finden Sie Anleitungen zu den Best Practices, die Sie beim Erstellen und Ausführen von hochgradig parallelen Dataflow-HPC-Workflows befolgen sollten. Dazu gehört auch die Verwendung von externem Code in Ihren Pipelines, die Ausführung der Pipeline und die Verwaltung der Fehlerbehandlung.

Externen Code in die Pipeline einbinden

Ein wichtiges Unterscheidungsmerkmal für hochparallele Pipelines besteht darin, dass sie C++-Code in DoFn statt einer der Apache Beam SDK-Standardsprachen verwenden. Bei Java-Pipelines wird empfohlen, externe Prozeduraufrufe zu verwenden, um die Verwendung von C++-Bibliotheken in der Pipeline zu vereinfachen. In diesem Abschnitt wird ein allgemeiner Ansatz zum Ausführen von externem Code (C++) in Java-Pipelines beschrieben.

Eine Apache Beam-Pipelinedefinition besteht aus mehreren Schlüsselkomponenten:

  • PCollections sind unveränderliche Sammlungen homogener Elemente.
  • PTransforms werden verwendet, um die Transformationen zu einer PCollection zu definieren, die einen weiteren PCollection generiert.
  • Die Pipeline ist das Konstrukt, mit dem Sie über Code die Interaktionen zwischen PTransforms und PCollections deklarieren können. Die Pipeline wird als gerichteter azyklischer Graph (DAG) dargestellt.

Wenn Sie Code aus einer Sprache verwenden, die nicht in der Apache Beam SDK-Standardsprache vorliegt, platzieren Sie den Code in PTransform, der sich innerhalb von DoFn befindet, und verwenden Sie eine der Standard-SDK-Sprachen für die Definition der Pipeline selbst. Wir empfehlen, das Apache Beam Python SDK zum Definieren der Pipeline zu verwenden, da das Python SDK eine Dienstprogrammklasse hat, die die Verwendung anderen Codes vereinfacht. Sie können jedoch die anderen Apache Beam SDKs verwenden.

Sie können den Code für eigene Experimente verwenden, ohne dass ein vollständiger Build erforderlich ist. Für ein Produktionssystem erstellen Sie in der Regel eigene Binärdateien, um den Ablauf auf Ihre Anforderungen abzustimmen.

Das folgende Diagramm veranschaulicht die zwei Verwendungszwecke von Pipeline-Daten:

  • Daten werden verwendet, um den Prozess zu steuern.
  • Daten werden während der Verarbeitung erfasst und mit den Treiberdaten verknüpft.

Zwei Phasen der Pipeline-Daten

Auf dieser Seite werden primäre Daten (aus der Quelle) als Ausgangsdaten und sekundäre Daten (aus der Verarbeitungsphase) als Zugangsdaten bezeichnet.

In einer Finanzanwendung könnten zum Beispiel ein paar Hunderttausend Handelstransaktionen die Ausgangsdaten sein. Jede Transaktion muss in Verbindung mit Marktdaten verarbeitet werden. Die Marktdaten sind also die Zugangsdaten. In einer Medienanwendung könnten die zu verarbeitenden Bilddateien die Ausgangsdaten sein. Wenn dafür keine zusätzlichen Datenquellen erforderlich sind, gibt es eben keine Zugangsdaten.

Zur Größe der Ausgangsdaten

Wenn das Ausgangsdatenelement im unteren Megabytebereich rangiert, empfiehlt sich die Verwendung des normalen Apache Beam-Paradigmas, das die Erstellung eines PCollection-Objekts aus der Quelle vorsieht, das anschließend zur Verarbeitung an die Apache Beam-Transformationen übergeben wird.

Liegt das Ausgangsdatenelement medientypisch im hohen Megabyte- oder gar Gigabytebereich, können Sie die Ausgangsdaten in Cloud Storage ablegen. Im ersten PCollection-Objekt verwenden Sie dann den Speicher-URI als Referenz, sodass nur der URI auf die Daten verweist.

Zur Größe der Zugangsdaten

Wenn die Zugangsdaten einige Hundert Megabyte groß sind oder darunter, können Sie die Daten über einen Side Input in die Apache Beam-Transformationen laden. Der Side Input sendet das Datenpaket an jeden Worker, der es benötigt.

Liegen die Zugangsdaten im Gigabyte- oder Terabytebereich, verwenden Sie entweder Bigtable oder Cloud Storage, um sie abhängig vom Datentyp mit den Ausgangsdaten zusammenzuführen. Bigtable eignet sich gut für Finanzszenarien, wenn über eine Schlüssel/Wert-Paar-Zuordnung häufig auf Marktdaten zugegriffen wird. Weitere Informationen zum Entwerfen Ihres Bigtable-Schemas, einschließlich Empfehlungen für die Arbeit mit Zeitachsendaten, finden Sie in der folgenden Bigtable-Dokumentation:

Externen Code ausführen

Externer Code kann in Apache Beam auf viele Arten ausgeführt werden.

  • Erstellen Sie einen Prozess, der von einem DoFn-Objekt in einer Dataflow-Transformation aufgerufen wird.

  • JNI mit dem Java SDK verwenden.

  • Erstellen Sie einen Unterprozess direkt aus dem Objekt DoFn. Dieser Ansatz ist zwar nicht der effizienteste, aber er ist robust und einfach zu implementieren. Aufgrund der potenziellen Probleme bei der Verwendung von JNI wird auf dieser Seite die Verwendung eines Unterprozessaufrufs demonstriert.

Berücksichtigen Sie beim Entwerfen Ihres Workflows die gesamte End-to-End-Pipeline. Jegliche Ineffizienzen bei der Ausführung des Prozesses werden durch die Tatsache ausgeglichen, dass die Datenbewegung von der Quelle bis zur Senke mit einer einzigen Pipeline erfolgt. Wenn Sie den Ansatz mit anderen vergleichen, sollten Sie die End-to-End-Zeiten der Pipeline sowie die End-to-End-Kosten in Betracht ziehen.

Die Binärdateien in die Hosts laden

Wenn Sie eine native Apache Beam-Sprache verwenden, verschiebt das Apache Beam SDK den erforderlichen Code automatisch an die Worker. Wenn Sie jedoch externen Code aufrufen, müssen Sie den Code manuell verschieben.

In Buckets gespeicherte Binärdateien

So verschieben Sie den Code: In diesem Beispiel werden die Schritte für das Apache Beam Java SDK gezeigt.

  1. Speichern Sie den kompilierten externen Code zusammen mit den Versionsinformationen in Cloud Storage.
  2. Erstellen Sie in der Methode @Setup einen synchronisierten Block, der überprüft, ob die Codedatei auf der lokalen Ressource verfügbar ist. Anstelle einer physischen Prüfung können Sie die Verfügbarkeit durch eine statische Variable bestätigen lassen, wenn der erste Thread beendet ist.
  3. Ist die Datei nicht verfügbar, laden Sie sie mithilfe der Cloud Storage-Clientbibliothek aus dem Cloud Storage-Bucket in den lokalen Worker. Für diese Aufgabe empfehlen wir die Verwendung der Apache Beam-Klasse FileSystems.
  4. Prüfen Sie, nachdem die Datei verschoben wurde, ob das Ausführungsbit auf die Codedatei festgelegt wurde.
  5. Prüfen Sie in einem Produktionssystem den Hash der Binärdateien, um sicherzugehen, dass die Datei korrekt kopiert wurde.

Hinweis: Sie können auch die Apache Beam-Funktion filesToStage verwenden. Dabei müssen Sie jedoch auf verschiedene Vorteile verzichten, die der Runner bezüglich der automatischen Verpackung und dem Verschieben des Java-Codes bietet. Außerdem müssen Sie, da der Aufruf des Unterprozesses einen absoluten Dateispeicherort benötigt, den Klassenpfad und damit den Speicherort der Datei, die von filesToStage verschoben wird, programmatisch festlegen. Wir raten von diesem Ansatz jedoch ab.

Externe Binärdateien ausführen

Bevor Sie externen Code ausführen können, müssen Sie einen Wrapper dafür erstellen. Den Wrapper schreiben Sie in der gleichen Sprache wie den externen Code (z. B. C++) oder als Shell-Skript. Mit dem Wrapper können Sie Dateihandles übergeben und Optimierungen vornehmen, wie im Abschnitt Verarbeitung für kurze CPU-Zyklen optimieren auf dieser Seite beschrieben. Der Wrapper muss kein Kunstwerk sein. Das folgende Code-Snippet zeigt einen einfachen Wrapper in C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Der Code liest zwei Parameter aus der Argumentliste. Der erste Parameter ist der Speicherort der Rückgabedatei, an die die Daten gesendet werden. Der zweite Parameter sind die Daten, die der Code dem Nutzer zurückgibt. In einer echten Implementierung würde dieser Code natürlich mehr als „Hallo Welt“ liefern!

Führen Sie nach dem Schreiben des Wrapper-Codes den externen Code so aus:

  1. Die Daten an die Binärdateien mit dem externen Code senden.
  2. Die Binärdateien ausführen, Fehler abfangen und Fehler und Ergebnisse protokollieren.
  3. Die Logging-Informationen verarbeiten.
  4. Die Daten aus der Verarbeitung erfassen.

Daten an die Binärdateien übertragen

Übergeben Sie Daten an den C++-Code, um die Ausführung der Bibliothek zu starten. In diesem Schritt können Sie die Dataflow-Integration mit anderen Google Cloud-Tools nutzen. Ein Tool wie Bigtable kann mit sehr großen Datasets umgehen und viele gleichzeitige Zugriffe mit geringer Latenz bewältigen. Dadurch können Tausende von Kernen gleichzeitig auf das Dataset zugreifen. Außerdem kann Bigtable Daten vorverarbeiten und vorbereiten, anreichern und filtern. All diese Schritte können in Beam-Transformationen vor der Ausführung des externen Codes erledigt werden.

In einem Produktionssystem empfiehlt es sich, einen Protokollpuffer zu verwenden, um die Eingabedaten zu kapseln. Sie können die Eingabedaten in Byte konvertieren und mit base64 codieren, bevor Sie sie an die externe Bibliothek übergeben. Es gibt zwei Möglichkeiten, diese Daten an die externe Bibliothek zu übergeben:

  • Kleine Eingabedaten. Für kleine Daten, die die maximale Länge des Systems für ein Befehlsargument nicht überschreiten, übergeben Sie das Argument in Position 2 des mit java.lang.ProcessBuilder erstellten Prozesses.
  • Große Eingabedaten. Für größere Daten erstellen Sie eine Datei, deren Name eine UUID enthält, und speichern darin die für den Prozess erforderlichen Daten.

C++-Code ausführen, Fehler abfangen und Logging

Die Erfassung und Verarbeitung von Fehlerinformationen ist ein wichtiger Teil Ihrer Pipeline. Die vom Dataflow-Runner verwendeten Ressourcen sind sitzungsspezifisch und es ist oft schwierig, Worker-Logdateien zu prüfen. Alle nützlichen Informationen müssen erfasst und an die Logging-Funktion von Dataflow weitergeleitet werden. Die Logging-Daten müssen in einem oder mehreren Cloud Storage-Buckets gespeichert werden.

Der empfohlene Ansatz besteht darin, stdout und stderr auf Dateien umzuleiten, damit Sie sich nicht um einen möglichen Speichermangel kümmern müssen. Im Dataflow-Runner, der den C++-Code aufruft, können Sie beispielsweise folgende Zeilen einfügen:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Logging-Informationen verarbeiten

In vielen Anwendungsfällen müssen Millionen von Elementen verarbeitet werden. Bei erfolgreicher Verarbeitung liefern die Logs nur wenig oder keine werthaltigen Informationen. Daher müssen Sie entscheiden, ob Log-Daten beibehalten oder verworfen werden sollen. Sie haben folgende Alternativen, wenn Sie die Log-Daten nicht behalten möchten:

  • Falls die Logs keine nützlichen Daten aus der Elementverarbeitung enthalten, sollten Sie sie löschen.
  • Schreiben Sie eine Logik, um die Log-Daten stichprobenweise abzufragen, zum Beispiel immer 10.000 Log-Einträge. Wenn sich eine homogene Verarbeitung abzeichnet (das heißt, viele Code-Iterationen erzeugen im Wesentlichen identische Log-Daten), bietet sich dies als ausgewogene Möglichkeit an, Log-Daten zu speichern und gleichzeitig die Verarbeitung zu optimieren.

Bei einem Fehler kann die Datenmenge in den Logs sehr stark anwachsen. Eine effektive Strategie zum Umgang mit großen Mengen von Fehlerlogdaten besteht darin, die ersten Zeilen des Logeintrags zu lesen und nur diese Zeilen an Cloud Logging zu verschieben. Sie können den Rest der Logdatei in Cloud Storage-Buckets laden. So können Sie sich später die ersten Zeilen der Fehlerlogs ansehen und bei Bedarf auf die gesamte Datei in Cloud Storage verweisen.

Die Größe der Log-datei zu prüfen ist auch hilfreich. Beträgt sie null, können Sie die Datei ignorieren oder eine einfache Lognachricht ausgeben, dass die Datei keine Daten enthält.

Daten aus der abgeschlossenen Verarbeitung erfassen

Zur Übergabe des Berechnungsergebnisses an die DoFn-Funktion sollten Sie nicht stdout verwenden. Anderer Code, den Ihr C++-Code aufruft, und sogar Ihr eigener Code, könnten ebenfalls Nachrichten an stdout senden und den stdoutput-Stream, der ansonsten Logging-Daten enthält, vollschreiben. Stattdessen empfiehlt es sich, den C++-Wrapper-Code so zu ändern, dass er einen Parameter akzeptiert, der angibt, wo eine Datei erstellt werden soll, die den Wert speichert. Im Idealfall sollte die Datei mithilfe von Protokollpuffern sprachneutral gespeichert werden, damit der C++-Code ein Objekt an den Java- oder Python-Code zurückgeben kann. Das DoFn-Objekt kann das Ergebnis direkt aus der Datei lesen und die Ergebnisse an seinen eigenen output-Aufruf übergeben.

Die Erfahrung hat gezeigt, dass Einheitentests zur Überprüfung eines Vorgangs sehr hilfreich sein können. Implementieren Sie daher einen Einheitentest, der den Prozess unabhängig von der Dataflow-Pipeline ausführt. Fehler in der Bibliothek lassen sich viel effizienter beheben, wenn sie unabhängig ist und nicht die gesamte Pipeline ausgeführt werden muss.

Verarbeitung für kurze CPU-Zyklen optimieren

Der Aufruf eines Unterprozesses erzeugt Aufwand. Abhängig von der Arbeitslast sind möglicherweise zusätzliche Schritte nötig, um das Verhältnis zwischen der eigentlichen Arbeit und dem Verwaltungsaufwand für das Starten und Herunterfahren des Prozesses auf ein vernünftiges Maß zu reduzieren.

Im Medienanwendungsfall kann das Ausgangsdatenelement im hohen Megabyte- oder gar Gigabytebereich liegen. Daher kann die Verarbeitung jedes Datenelements viele Minuten dauern. In diesem Fall sind die Kosten für den Aufruf des Unterprozesses im Vergleich zur Gesamtverarbeitungszeit unbedeutend. Der beste Ansatz in dieser Situation besteht darin, jedes Element seinen eigenen Prozess starten zu lassen.

In anderen Anwendungsfällen, z. B. für die Finanzen, erfordert die Verarbeitung jedoch sehr kleine CPU-Zeiteinheiten (zehntel Millisekunden). In diesem Fall ist der Overhead für den Aufruf des Unterprozesses überproportional groß. Eine Lösung für dieses Problem besteht darin, mit der GroupByKey-Transformation Apache Beam-Batches zu erstellen, die Batches zwischen 50 und 100 Elementen enthalten, die in den Prozess eingespeist werden. Sie können dazu beispielsweise so vorgehen:

  • Sie erstellen ein Schlüssel/Wert-Paar in einer DoFn-Funktion. Bei der Verarbeitung von Handelstransaktionen könnten Sie die Transaktionsnummer als Schlüssel verwenden. Wenn Sie keine eindeutige Zahl für den Schlüssel haben, können Sie eine Prüfsumme aus den Daten generieren und mit einer Modulo-Funktion Partitionen mit je 50 Elementen erstellen.
  • Sie senden den Schlüssel an eine GroupByKey.create-Funktion, die im Gegenzug eine KV<key,Iterable<data>>-Sammlung mit den 50 Elementen zurückgibt, die Sie dann an den Vorgang übergeben.

Parallelität der Worker begrenzen

Wenn Sie mit einer Sprache arbeiten, die im Dataflow-Runner nativ unterstützt wird, müssen Sie sich keine Gedanken darüber machen, was mit dem Worker geschieht. Dataflow hat viele Prozesse zur Überwachung der Flusssteuerung und Threads im Batch- oder Stream-Modus.

Wenn Sie hingegen mit einer externen Sprache wie C++ arbeiten, seien Sie sich darüber im Klaren, dass der Start von Unterprozessen eine eher ungewöhnliche Vorgehensweise ist. Im Batch-Modus verwendet der Dataflow-Runner im Vergleich zum Streaming-Modus ein kleines Verhältnis aus Working-Threads zu CPUs. Insbesondere im Streaming-Modus empfiehlt es sich, ein Semaphor innerhalb der Klasse zu erstellen, um die Parallelität einzelner Worker direkter zu steuern.

Bei der Medienverarbeitung ist es beispielsweise nicht wünschenswert, Hunderte von Transcodierungselementen von einem einzelnen Worker parallel verarbeiten zu lassen. In solchen Fällen können Sie eine Dienstprogrammklasse erstellen, die der DoFn-Funktion Berechtigungen für die auszuführende Arbeit ausstellt. Mit dieser Klasse können Sie die Worker-Threads in der Pipeline direkt steuern.

Datensenken mit hoher Kapazität in Google Cloud verwenden

Nachdem die Daten verarbeitet wurden, werden sie an eine Datensenke gesendet. Die Senke muss in der Lage sein, die Ergebnismenge zu verarbeiten, die von der Grid-Verarbeitungslösung erzeugt wird.

Das folgende Diagramm zeigt einige der in Google Cloud verfügbaren Senken, wenn Dataflow eine Grid-Arbeitslast ausführt.

In Google Cloud verfügbare Senken

Bigtable, BigQuery und Cloud Pub/Sub können alle mit sehr großen Datenstreams umgehen. Beispielsweise kann jeder Bigtable-Knoten 10.000 Inserts pro Sekunde bis zu einer Größe von 1 KB mit einfacher horizontaler Skalierbarkeit verarbeiten. In diesem Fall kann ein Bigtable-Cluster mit 100 Knoten 1.000.000 Nachrichten pro Sekunde aufnehmen, die vom Dataflow-Grid generiert werden.

segfaults verwalten

Wenn Sie C++-Code in einer Pipeline verwenden, müssen Sie entscheiden, wie Sie Segfaults verwalten, da sie nicht-lokale Auswirkungen haben, wenn sie nicht richtig behandelt werden. Der Dataflow-Runner erstellt Prozesse nach Bedarf in Java, Python oder Go und weist den Prozessen dann Arbeit in Form von Bundles zu.

Wenn der Aufruf des C++-Codes mit eng gekoppelten Tools wie JNI oder Cython erfolgt und der C++-Prozess segfaults generiert, stürzen auch der aufrufende Prozess und die Java Virtual Machine (JVM) ab. In diesem Szenario sind fehlerhafte Datenpunkte nicht erkennbar. Verwenden Sie eine weniger enge Kopplung, um fehlerhafte Datenpunkte abzufangen. Dadurch werden fehlerhafte Daten abgezweigt und die Pipeline kann fortgesetzt werden. Mit ausgereiftem C++-Code, der vollständig für alle Datenvarianten getestet wurde, können Sie jedoch Mechanismen wie Cython verwenden.

Nächste Schritte