Bonnes pratiques pour les workflows hautement parallèles

Cette page fournit des conseils sur les bonnes pratiques à suivre lors de la création et de l'exécution de workflows Dataflow HPC hautement parallèles, y compris comment utiliser du code externe dans vos pipelines, comment exécuter le pipeline, et comment gérer le traitement des erreurs.

Inclure du code externe dans votre pipeline

Les pipelines hautement parallèles se distinguent par le fait qu'ils utilisent du code C++ dans la fonction DoFn plutôt que l'un des langages standards du SDK Apache Beam. Pour les pipelines Java, il est recommandé d'utiliser des appels de procédure externes afin de faciliter l'utilisation de bibliothèques C++ dans le pipeline. Cette section décrit l'approche générale utilisée pour exécuter du code externe (C++) dans les pipelines Java.

Une définition de pipeline Apache Beam comporte plusieurs composants clés :

  • Les PCollections sont des collections immuables d'éléments homogènes.
  • Les PTransforms sont utilisées pour définir les transformations sur une PCollection qui génère une autre PCollection.
  • Le pipeline est la construction qui vous permet, via le code, de déclarer les interactions entre PTransforms et PCollections. Le pipeline est représenté sous forme de graphe orienté acyclique (DAG).

Lorsque vous utilisez du code dans un langage qui n'est pas l'un des langages standards du SDK Apache Beam, placez le code dans PTransform, qui se trouve dans DoFn, et utilisez l'un des langages standards du SDK pour définir le pipeline lui-même. Nous vous recommandons d'utiliser le SDK Python d'Apache Beam pour définir le pipeline, car il comporte une classe utilitaire qui simplifie l'utilisation d'autres codes. Vous pouvez toutefois utiliser les autres SDK Apache Beam.

Vous pouvez utiliser ce code pour faire des expériences rapides sans toutefois nécessiter une compilation complète. Pour un système de production, vous créez généralement vos propres binaires, ce qui vous donne la liberté d'adapter le processus à vos besoins.

Le diagramme suivant illustre les deux utilisations des données de pipeline :

  • Des données sont utilisées pour piloter le processus.
  • Des données sont acquises durant le traitement et jointes aux données pilotes.

Deux étapes de données de pipeline

Sur cette page, nous désignons les données primaires (provenant de la source) par le terme données pilotes et les données secondaires (acquises durant la phase de traitement) par le terme données connexes.

Dans un cas d'utilisation issu du domaine de la finance, les données pilotes peuvent être constituées de quelques centaines de milliers de transactions. Chaque transaction doit être traitée conjointement avec des données du marché. Dans ce cas, les données de marché sont les données connexes. Dans un cas d'utilisation lié au traitement de fichiers médias, les données pilotes peuvent être des fichiers d'images qui nécessitent un traitement, mais n'ont pas besoin d'autres sources de données et n'utilisent donc pas de données connexes.

Considérations de taille pour les données pilotes

Si la taille des éléments des données pilotes est de l'ordre de quelques mégaoctets, traitez-les avec le paradigme Apache Beam habituel en créant un objet PCollection à partir de la source et en envoyant cet objet aux transformations Apache Beam pour traitement.

Si la taille des éléments des données pilotes est plutôt de l'ordre de quelques centaines de mégaoctets ou du gigaoctet, comme c'est le cas pour les fichiers de médias, vous pouvez héberger ces données pilotes dans Cloud Storage. Ensuite, dans l'objet PCollection initial, référencez l'URI de stockage, et uniquement une référence d'URI aux données à utiliser.

Considérations de taille pour les données connexes

Si les données connexes ne dépassent pas quelques centaines de mégaoctets, utilisez une entrée secondaire pour transmettre ces données aux transformations Apache Beam. L'entrée secondaire envoie le paquet de données à chaque nœud de calcul qui en a besoin.

Si la taille des éléments des données connexes est plutôt de l'ordre du gigaoctet ou du téraoctet, utilisez Bigtable ou Cloud Storage pour fusionner les données connexes avec les données pilotes, en fonction de la nature des données. Bigtable est idéal pour les scénarios issus du domaine de la finance, où les données du marché font l'objet d'accès fréquents sous forme de recherches clé-valeur dans Bigtable. Pour plus d'informations sur la conception de votre schéma Bigtable, y compris des recommandations pour l'utilisation de données de séries temporelles, consultez la documentation Bigtable suivante :

Exécuter le code externe

Vous pouvez exécuter du code externe dans Apache Beam de différentes manières.

  • Créer un processus appelé à partir d'un objet DoFn dans une transformation Dataflow.

  • Utiliser JNI avec le SDK Java.

  • Créer un sous-processus directement à partir de l'objet DoFn. Bien que cette approche ne soit pas la plus efficace, elle est fiable et simple à mettre en œuvre. En raison des problèmes potentiels liés à l'utilisation de JNI, cette page explique comment utiliser un appel de sous-processus.

Lorsque vous concevez votre workflow, considérez le pipeline complet de bout en bout. Toute inefficacité dans la façon dont le processus est exécuté est compensée par le fait que le transfert de données de la source jusqu'au récepteur repose sur un unique pipeline. Si vous comparez cette approche à d'autres, pensez à examiner, en plus de ses coûts, la durée d'exécution du pipeline de bout en bout.

Mettre en place les binaires sur les hôtes

Lorsque vous utilisez un langage Apache Beam natif, le SDK Apache Beam transfère automatiquement tout le code requis aux nœuds de calcul. Toutefois, lorsque vous appelez un code externe, vous devez le transférer manuellement.

Fichiers binaires stockés dans des buckets

Pour déplacer le code, procédez comme suit : L'exemple décrit les étapes pour le SDK Java Apache Beam.

  1. Stockez le code externe compilé, ainsi que les informations de version, dans Cloud Storage.
  2. Dans la méthode @Setup, créez un bloc synchronisé pour vérifier si le fichier de code est disponible sur la ressource locale. Plutôt que de mettre en œuvre une vérification physique, vous pouvez confirmer la disponibilité du fichier à l'aide d'une variable statique à la fin du premier thread.
  3. Si le fichier n'est pas disponible, utilisez la bibliothèque cliente de Cloud Storage pour extraire le fichier du bucket Cloud Storage et pour le transférer au nœud de calcul local. Une approche recommandée consiste à utiliser la classe Apache Beam FileSystems pour cette tâche.
  4. Une fois le fichier déplacé, vérifiez que le bit d'exécution est défini sur le fichier de code.
  5. Dans un système de production, vérifiez le hachage des binaires pour vous assurer que le fichier a été copié correctement.

L'utilisation de la fonction Apache Beam filesToStage est aussi une option, mais elle supprime certains avantages au niveau de la capacité de l'exécuteur à empaqueter et déplacer automatiquement votre code Java. De plus, l'appel du sous-processus nécessitant un emplacement de fichier absolu, vous devez faire appel à du code pour déterminer le chemin d'accès aux classes et, par conséquent, l'emplacement du fichier déplacé par filesToStage. Nous ne recommandons donc pas cette approche.

Exécuter les binaires externes

Avant de pouvoir exécuter du code externe, vous devez créer un wrapper pour l'encapsuler. Écrivez ce wrapper dans le même langage que le code externe (par exemple, C++) ou en tant que script shell. Le wrapper vous permet de transmettre des descripteurs de fichier et d'implémenter les optimisations décrites dans la section Concevoir un traitement pour minimiser les cycles de processeur de cette page. Votre wrapper n'a pas besoin d'être sophistiqué. L'extrait de code suivant montre le squelette d'un wrapper en C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Ce code lit deux paramètres de la liste d'arguments. Le premier paramètre est l'emplacement du fichier de résultats où sont envoyées les données. Le deuxième paramètre représente les données que le code renvoie à l'utilisateur. Dans une implémentation réaliste, ce code ne se limiterait pas à renvoyer "Hello, world!"

Après avoir écrit le code du wrapper, exécutez le code externe en procédant comme suit :

  1. Transmettez les données aux binaires de code externe.
  2. Exécutez les binaires, interceptez les erreurs éventuelles, et consignez les erreurs et les résultats dans les fichiers journaux.
  3. Gérez les informations de journalisation.
  4. Récupérez les données issues du traitement, une fois celui-ci terminé.

Transmettre les données aux binaires

Pour démarrer le processus d'exécution de la bibliothèque, transmettez des données au code C++. C'est à cette étape que vous pouvez bénéficier de l'intégration de Dataflow avec d'autres outils Google Cloud. Un outil tel que Bigtable peut traiter de très grands ensembles de données et gérer les accès avec une simultanéité élevée et une faible latence, ce qui permet à des milliers de cœurs d'accéder simultanément à l'ensemble de données. En outre, Bigtable peut réaliser un pré-traitement des données, ce qui permet de les mettre en forme, les enrichir et les filtrer. Tout ce travail peut être effectué dans des transformations Apache Beam avant l'exécution du code externe.

Pour un système de production, la solution recommandée consiste à utiliser un tampon de protocole pour encapsuler les données d'entrée. Vous pouvez convertir les données d'entrée en octets et les encoder en base64 avant de les transmettre à la bibliothèque externe. Il existe deux manières de transmettre ces données à la bibliothèque externe :

  • Données d'entrée de petite taille. Pour les données de petite taille, qui ne dépassent pas la longueur maximale autorisée par le système pour un argument de commande, transmettez l'argument à la position 2 du processus en cours de construction avec java.lang.ProcessBuilder.
  • Données d'entrée de grande taille. Pour les données de plus grande taille, créez un fichier dont le nom comprend un UUID et qui hébergera les données requises par le processus.

Exécuter le code C++, intercepter les erreurs et journaliser

La capture et la gestion des informations d'erreur constituent un élément essentiel de votre pipeline. Les ressources utilisées par l'exécuteur Dataflow sont éphémères, et il est souvent difficile d'inspecter les fichiers journaux des nœuds de calcul. Vous devez veiller à intercepter et transmettre toutes les informations utiles à la journalisation de l'exécuteur Dataflow, et à stocker les données de journalisation dans un ou plusieurs buckets Cloud Storage.

L'approche recommandée consiste à rediriger stdout et stderr vers des fichiers, ce qui vous permet d'éviter les problèmes de mémoire insuffisante. Par exemple, dans l'exécuteur Dataflow qui appelle le code C++, vous pouvez inclure des lignes telles que :

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Gérer les informations de journalisation

De nombreux cas d'utilisation impliquent le traitement de millions d'éléments. Un traitement réussi génère des journaux avec pas ou peu de valeur. Vous devez donc prendre une décision stratégique concernant la conservation des données de journalisation. Par exemple, plutôt que de conserver l'intégralité des données de journalisation, envisagez les solutions alternatives suivantes :

  • Si les informations de journalisation provenant des traitements réussis d'éléments ne présentent aucune utilité, ne les conservez pas.
  • Créez une logique d'échantillonnage des données de journaux, par exemple en créant un échantillon toutes les 10 000 entrées. Si le traitement est homogène, par exemple lorsque de nombreuses itérations du code génèrent des entrées de journal essentiellement identiques, cette approche fournit un équilibre efficace entre conservation des données de journalisation et optimisation du traitement.

En cas de défaillance, la quantité de données déchargée dans les journaux peut être considérable. Une stratégie efficace pour traiter de grandes quantités de données dans un journal d'erreurs consiste à lire les premières lignes de l'entrée de journal et à n'envoyer que ces lignes à Cloud Logging. Vous pouvez charger le reste du fichier journal dans des buckets Cloud Storage. Cette approche vous permet de consulter ultérieurement les premières lignes des journaux d'erreurs et, si nécessaire, de vous référer à Cloud Storage pour l'ensemble du fichier.

Il est également utile de vérifier la taille du fichier journal. Si la taille du fichier est égale à zéro, vous pouvez l'ignorer ou enregistrer un simple message de journalisation indiquant que le fichier ne contenait aucune donnée.

Récupérer les données issues du traitement

Il n'est pas recommandé d'utiliser stdout pour renvoyer le résultat du calcul à la fonction DoFn. D'autres éléments de code appelés par votre code C++, et même votre propre code, sont susceptibles d'envoyer des messages à stdout, ce qui pollue le flux stdoutput contenant les données de journalisation. Une bonne pratique recommandée consiste à modifier le code du wrapper C++ pour lui permettre d'accepter un paramètre indiquant où créer le fichier qui stocke la valeur. Idéalement, ce fichier doit être stocké de manière indépendante du langage au moyen de tampons de protocole, ce qui permet au code C++ de renvoyer un objet au code Java ou Python. L'objet DoFn peut lire le résultat directement depuis le fichier et transmettre les informations sur les résultats à son propre appel output.

L'expérience montre qu'il est essentiel d'exécuter des tests unitaires portant sur le processus lui-même. Il est en effet important de mettre en œuvre un test unitaire qui exécute le processus indépendamment du pipeline Dataflow. Le débogage de la bibliothèque peut considérablement gagner en efficacité s'il peut être réalisé de manière autonome, sans nécessiter l'exécution de l'intégralité du pipeline.

Concevoir un traitement pour minimiser les cycles de processeur

L'appel d'un sous-processus entraîne une surcharge. Suivant votre charge de travail, il peut être nécessaire d'effectuer un travail supplémentaire pour réduire le ratio entre le travail réalisé et la charge d'administration liée au démarrage et à l'arrêt du processus.

Dans le cas d'utilisation de fichiers de médias, la taille des éléments des données pilotes peut être exprimée en mégaoctets ou en gigaoctets. Par conséquent, le traitement de chaque élément de données peut prendre plusieurs minutes. Dans ce cas, le coût de l'appel du sous-processus est insignifiant par rapport au temps de traitement total. La meilleure approche à adopter dans cette situation consiste à faire en sorte que chaque élément lance son propre processus.

Toutefois, dans d'autres cas d'utilisation (par exemple, la finance), le traitement proprement dit ne nécessite que de très petites unités de temps CPU (quelques dizaines de millisecondes). Dans ce cas, la surcharge résultant de l'appel du sous-processus est disproportionnée. Une solution à ce problème consiste à utiliser la transformation Apache Beam GroupByKey pour créer des lots de 50 à 100 éléments afin d'alimenter le processus. Par exemple, vous pouvez suivre les étapes suivantes :

  • Dans une fonction DoFn, créez une paire clé-valeur. Si vous traitez des transactions financières, vous pouvez utiliser le numéro de la transaction comme clé. Si vous n'avez pas de numéro unique à utiliser comme clé, vous pouvez générer une somme de contrôle à partir des données et utiliser une fonction modulo pour créer des partitions de 50 éléments.
  • Envoyez la clé à une fonction GroupByKey.create, qui renvoie une collection KV<key,Iterable<data>> contenant les 50 éléments que vous pouvez envoyer au processus.

Limiter le parallélisme des nœuds de calcul

Lorsque vous travaillez avec un langage disponible en natif dans l'exécuteur Dataflow, vous n'avez jamais besoin de réfléchir à ce qu'il advient du nœud de calcul. Dataflow possède de nombreux processus supervisant le contrôle de flux et les threads en mode de traitement par lot ou par flux.

Toutefois, si vous utilisez un langage externe tel que C++, soyez conscient du fait que démarrer des sous-processus sort quelque peu de l'ordinaire. En mode de traitement par lot, l'exécuteur Dataflow utilise un faible ratio "threads de calcul/nombre de processeurs" par rapport au mode de traitement en flux continu. Il est recommandé, en particulier en mode de traitement en flux continu, de créer un sémaphore au sein de votre classe pour pouvoir exercer un contrôle plus direct sur le parallélisme d'un nœud de calcul.

Par exemple, dans le cadre du traitement de fichiers de médias, vous souhaiterez sans doute éviter que des centaines d’éléments de transcodage soient traités en parallèle par un seul nœud de calcul. Dans de tels cas, vous pouvez créer une classe utilitaire fournissant des autorisations à la fonction DoFn pour la tâche exécutée. L'utilisation de cette classe vous permet de prendre le contrôle direct des threads de calcul au sein de votre pipeline.

Utiliser des récepteurs de données à haute capacité dans Google Cloud

Une fois les données traitées, elles sont envoyées à un récepteur de données. Ce récepteur doit pouvoir gérer le volume de résultats généré par votre solution de traitement en grille.

Le diagramme suivant présente certains des récepteurs disponibles dans Google Cloud lorsque Dataflow exécute une charge de travail de calcul en grille.

Récepteurs disponibles dans Google Cloud

Bigtable, BigQuery et Pub/Sub sont tous capables de traiter des flux de données très volumineux. Par exemple, chaque nœud Bigtable peut gérer 10 000 insertions par seconde, chaque insertion pouvant atteindre une taille maximale de 1 K, le tout dans un contexte d'évolutivité horizontale facile à mettre en œuvre. Par conséquent, un cluster Bigtable de 100 nœuds peut absorber chaque seconde 1 000 000 de messages générés par la grille Dataflow.

Gérer les erreurs de segmentation (segfaults)

Lorsque vous utilisez du code C++ dans un pipeline, vous devez décider de la façon de gérer les erreurs de segmentation, car elles ont des conséquences non locales si elles ne sont pas gérées correctement. L'exécuteur Dataflow crée des processus en fonction des besoins en Java, Python ou Go, puis attribue des tâches aux processus sous forme de lots.

Si l'appel au code C++ est effectué à l'aide d'outils à couplage fort, tels que JNI ou Cython, et que le code C++ traite des segfaults, le processus appelant et la machine virtuelle Java (JVM) plantent également. Dans ce scénario, les points de données incorrects ne peuvent pas être interceptés. Pour rendre possible l'interception des points de données incorrects, utilisez un couplage plus lâche, qui exclut les branches de données incorrectes et permet au pipeline de continuer. Cependant, avec un code C++ mature qui a été entièrement testé sur toutes les variantes de données, vous pouvez utiliser des mécanismes tels que Cython.

Étapes suivantes