Déployer un pipeline

Ce document explique en détail comment Dataflow déploie et exécute un pipeline, et aborde des sujets avancés tels que l'optimisation et l'équilibrage de charge. Si vous recherchez un guide par étapes pour créer et déployer votre premier pipeline, utilisez les guides de démarrage rapide de Dataflow pour Java, Python ou les modèles.

Après avoir construit et testé votre pipeline Apache Beam, vous pouvez utiliser le service géré Dataflow pour le déployer et l'exécuter. Une fois sur le service Dataflow, votre code de pipeline devient une tâche Dataflow.

Le service Dataflow gère entièrement les services Google Cloud, tels que Compute Engine et Cloud Storage, pour exécuter votre tâche Dataflow, en créant et en supprimant automatiquement les ressources nécessaires. Il vous offre une grande visibilité sur votre tâche grâce à des outils comme l'interface de surveillance de Dataflow et l'interface de ligne de commande de Dataflow.

Vous pouvez contrôler certains aspects de la manière dont le service Dataflow exécute votre tâche en configurant des paramètres d'exécution dans votre code de pipeline. Par exemple, les paramètres d'exécution spécifient si les étapes de votre pipeline doivent être exécutées sur des machines virtuelles de nœud de calcul, sur le backend du service Dataflow, ou en local.

Outre la gestion des ressources Google Cloud, le service Dataflow exécute et optimise automatiquement de nombreux aspects du traitement parallèle distribué, En voici quelques-uns :

  • Parallélisation et distribution Dataflow partitionne automatiquement vos données et distribue votre code de nœud de calcul aux instances Compute Engine pour un traitement parallèle. Pour en savoir plus, consultez la section Parallélisation et distribution.
  • Optimisation. Dataflow utilise votre code de pipeline pour créer un graphique d'exécution représentant les classes PCollection et les transformations de votre pipeline, et améliore le graphique afin d'optimiser les performances et l'utilisation des ressources. Le service optimise également automatiquement les opérations potentiellement coûteuses, telles que les agrégations de données. Pour en savoir plus, consultez les sections Optimisation de la fusion et Combiner l'optimisation.
  • Fonctions de réglage automatique. Le service Dataflow inclut plusieurs fonctionnalités qui permettent d'ajuster à la volée l'allocation des ressources et le partitionnement des données, par exemple : Autoscaling horizontal ,Autoscaling vertical etRééquilibrage dynamique des tâches. Ces fonctionnalités aident le service Dataflow à exécuter votre tâche aussi rapidement et aussi efficacement que possible.

Cycle de vie du pipeline : du code de pipeline à la tâche Dataflow

Lorsque vous exécutez votre pipeline Dataflow, le service crée à partir du code qui construit votre objet Pipeline un graphique d'exécution comprenant toutes les transformations et leurs fonctions de traitement associées (par exemple, des fonctions DoFn). Cette phase, appelée Temps de construction du graphique, s'exécute localement sur l'ordinateur sur lequel le pipeline est exécuté.

Durant la construction du graphique, Apache Beam exécute localement le code à partir du point d'entrée principal du code du pipeline, en s'arrêtant aux appels à une source, à un récepteur ou à une étape de transformation, puis en convertissant ces appels en nœuds du graphique. Par conséquent, une partie du code du point d'entrée d'un pipeline (méthode main() de Java ou niveau supérieur d'un script Python) s'exécute localement sur la machine qui exécute le pipeline, tandis que le même code déclaré dans une méthode d'objet DoFn s'exécute dans les nœuds de calcul Dataflow.

Durant la construction du graphique, Apache Beam vérifie également que toutes les ressources référencées par le pipeline (telles que les buckets Cloud Storage, les tables BigQuery, et les sujets ou abonnements Pub/Sub) existent et sont accessibles. Cette vérification est effectuée via des appels d'API standards aux services respectifs. Il est donc essentiel que le compte utilisateur utilisé pour exécuter un pipeline dispose d'une connectivité appropriée aux services nécessaires et soit autorisé à appeler leurs API. Avant d'envoyer le pipeline au service Dataflow, Apache Beam recherche également d'autres erreurs et s'assure que le graphique du pipeline ne contient aucune opération non conforme.

Le graphique d'exécution est ensuite converti au format JSON, puis il est transmis au point de terminaison du service Dataflow.

Remarque : La construction du graphique se produit également lorsque vous exécutez votre pipeline localement. Cependant, le graphique n'est pas converti au format JSON ni transmis au service. Au lieu de cela, il est exécuté localement sur la machine sur laquelle vous avez lancé votre programme Dataflow. Consultez la documentation sur la configuration de l'exécution locale pour en savoir plus.

Le service Dataflow valide ensuite le graphique d'exécution JSON. Une fois le graphique validé, il devient une tâche sur le service Dataflow. Vous pouvez consulter votre tâche, son graphique d'exécution, son état et ses informations de journalisation à l'aide de l'interface de surveillance de Dataflow.

Java

Le service Dataflow envoie une réponse à la machine sur laquelle vous avez exécuté votre programme Dataflow. Cette réponse est encapsulée dans l'objet DataflowPipelineJob, qui contient le paramètre jobId de votre tâche Dataflow. Pour surveiller votre tâche, la suivre et résoudre les problèmes la concernant, vous pouvez utiliser le paramètre jobId dans l'interface de surveillance de Dataflow et l'interface de ligne de commande de Dataflow. Consultez la documentation de référence de l'API pour DataflowPipelineJob pour en savoir plus.

Python

Le service Dataflow envoie une réponse à la machine sur laquelle vous avez exécuté votre programme Dataflow. Cette réponse est encapsulée dans l'objet DataflowPipelineResult, qui contient le paramètre job_id de votre tâche Dataflow. Pour surveiller votre tâche, la suivre et résoudre les problèmes la concernant, vous pouvez utiliser le paramètre job_id dans l'interface de surveillance de Dataflow et l'interface de ligne de commande de Dataflow.

Graphique d'exécution

Dataflow crée un graphique d'étapes représentant votre pipeline, en fonction des transformations et des données que vous avez utilisées lors de la construction de votre objet Pipeline. Il s'agit du graphique d'exécution du pipeline.

L'exemple WordCount, inclus dans les SDK Apache Beam, contient une série de transformations permettant de lire, d'extraire, de compter, de formater et d'écrire les mots individuels d'une collection de texte, ainsi que le nombre d'occurrences de chaque mot. Le diagramme suivant montre comment les transformations dans le pipeline WordCount sont développées dans un graphique d'exécution :

Les transformations dans l'exemple de programme WordCount ont été développées dans un graphique d'exécution des étapes que le service Cloud Dataflow doit exécuter.
Figure 1 : Exemple de graphique d'exécution WordCount

Le graphique d'exécution diffère souvent de l'ordre dans lequel vous avez spécifié vos transformations lors de la construction du pipeline. En effet, le service Dataflow effectue diverses optimisations et fusions sur le graphique d'exécution avant de s'exécuter sur des ressources de cloud gérées. Le service Dataflow respecte les dépendances de données lors de l'exécution de votre pipeline. Cependant, les étapes sans dépendance de données entre elles peuvent être exécutées dans n'importe quel ordre.

Vous pouvez consulter le graphique d'exécution non optimisé généré par Dataflow pour votre pipeline lorsque vous sélectionnez votre tâche dans l'interface de surveillance de Dataflow. Pour en savoir plus sur l'affichage des tâches, consultez la section Utiliser l'interface de surveillance Dataflow.

Parallélisation et distribution

Le service Dataflow parallélise la logique de traitement de votre pipeline et la distribue automatiquement aux nœuds de calcul auxquels vous avez attribué votre tâche. Dataflow utilise les abstractions du modèle de programmation pour représenter les fonctions de parallélisation. Par exemple, vos transformations ParDo forcent Dataflow à distribuer automatiquement votre code de traitement (représenté par des fonctions DoFn) à plusieurs nœuds de calcul qui doivent s'exécuter en parallèle.

Structurer votre code d'utilisateur

Vous pouvez considérer votre code DoFn comme de petites entités indépendantes : de nombreuses instances peuvent potentiellement être exécutées sur des machines différentes, sans se connaître. Ainsi, les fonctions pures (qui ne dépendent pas d'un état caché ou externe, qui n'ont pas d'effets secondaires observables et qui sont déterministes) constituent le code idéal pour la nature parallèle et distribuée des fonctions DoFn.

Toutefois, le modèle de fonction pure n'est pas strictement rigide. Les informations d'état ou les données d'initialisation externes peuvent être valides pour DoFn et d'autres objets de fonction, tant que votre code ne dépend pas d'éléments que le service Dataflow ne garantit pas. Lors de la structuration de vos transformations ParDoet de la création de vos fonctions DoFn, tenez compte des recommandations suivantes :

  • Le service Dataflow garantit que chaque élément de votre classe PCollection d'entrée est traité par une instance DoFn exactement une fois.
  • Le service Dataflow ne garantit pas le nombre de fois qu'une fonction DoFn sera appelée.
  • Le service Dataflow ne garantit pas la manière exacte dont les éléments distribués sont regroupés : il ne garantit pas quels éléments sont traités ensemble (le cas échéant).
  • Le service Dataflow ne garantit pas le nombre exact d'instances DoFn qui seront créées au cours de l'exécution d'un pipeline.
  • Le service Dataflow est tolérant aux pannes et peut réessayer votre code plusieurs fois en cas de problèmes avec les nœuds de calcul. Le service Dataflow peut créer des copies de sauvegarde de votre code et peut rencontrer des problèmes d'effets secondaires manuels (par exemple, si votre code s'appuie sur des fichiers temporaires avec des noms qui ne sont pas uniques ou s'il en crée).
  • Le service Dataflow sérialise le traitement des éléments par instance DoFn. Votre code n'a pas besoin d'être entièrement sécurisé. Cependant, tout état partagé entre plusieurs instances DoFn doit être sécurisé.

Consultez la section sur la configuration requise pour les fonctions fournies par l'utilisateur dans la documentation du modèle de programmation pour en savoir plus sur la création de votre code utilisateur.

Traitement des erreurs et des exceptions

Votre pipeline peut générer des exceptions lors du traitement des données. Certaines de ces erreurs sont transitoires (par exemple, difficulté temporaire à accéder à un service externe), mais certaines sont permanentes, telles que les erreurs causées par des données d'entrée corrompues ou impossibles à analyser, ou par des pointeurs vides lors du calcul.

Dataflow traite les éléments sous forme d'ensembles arbitraires et relance l'ensemble complet lorsqu'une erreur est générée pour l'un des éléments qu'il contient. Lors de l'exécution en mode de traitement par lots, les ensembles comprenant un élément défaillant sont relancés quatre fois. Le pipeline échoue complètement lorsqu'un ensemble échoue quatre fois. Lors de l'exécution en mode de traitement par flux, un ensemble comprenant un élément défaillant est relancé indéfiniment, ce qui risque de bloquer votre pipeline de manière permanente.

Les erreurs liées aux nœuds de calcul de démarrage, telles que l'échec de l'installation de packages sur les nœuds de calcul, sont temporaires. Cela risque d'entraîner un nombre de nouvelles tentatives illimitées et de bloquer votre pipeline de manière permanente.

Optimisation de la fusion

Une fois que le format JSON du graphique d'exécution de votre pipeline a été validé, le service Dataflow peut modifier le graphique pour effectuer des optimisations. Ces optimisations peuvent inclure la fusion de plusieurs étapes ou transformations du graphique d'exécution de votre pipeline en une seule étape. La fusion des étapes évite au service Dataflow de matérialiser chaque classe PCollection intermédiaire de votre pipeline, ce qui peut être coûteux en termes de surcharge de mémoire et de traitement.

Bien que toutes les transformations que vous avez spécifiées dans votre construction de pipeline soient exécutées sur le service, elles peuvent être exécutées dans un ordre différent ou dans le cadre d'une transformation fusionnée plus importante pour assurer l'exécution optimale de votre pipeline. Le service Dataflow respecte les dépendances de données entre les étapes du graphique d'exécution, mais les autres étapes peuvent être exécutées dans n'importe quel ordre.

Exemple de fusion

Le diagramme suivant montre comment le service Dataflow peut optimiser et fusionner le graphique d'exécution de l'exemple WordCount inclus avec le SDK Apache Beam pour Java pour une exécution efficace :

Le graphique d'exécution de l'exemple de programme WordCount est optimisé et comporte des étapes fusionnées par le service Cloud Dataflow.
Figure 2 : Exemple de graphique d'exécution optimisé pour WordCount

Empêcher la fusion

Dans votre pipeline, plusieurs situations peuvent vous amener à empêcher le service Dataflow d'effectuer des optimisations de fusion. Dans certains cas, le service Dataflow pourrait mal déterminer le moyen optimal pour fusionner des opérations dans le pipeline, ce qui pourrait limiter la capacité du service à utiliser tous les nœuds de calcul disponibles.

Par exemple, dans le cas d'une opération ParDo "à haute distribution ramifiée", la fusion peut limiter la capacité de Dataflow à optimiser l'utilisation des nœuds de calcul. Avec ce type d'opération, vous pouvez avoir une collection d'entrées avec relativement peu d'éléments, mais l'opération ParDo génère une sortie avec des éléments cent fois, voire mille fois plus nombreux, suivis d'une autre opération ParDo. Si le service Dataflow fusionne ces opérations ParDo, le parallélisme de cette étape est limité au maximum au nombre d'éléments de la collection d'entrée, même si la classe PCollection intermédiaire contient beaucoup plus d'éléments.

Vous pouvez empêcher une telle fusion en ajoutant à votre pipeline une opération qui oblige le service Dataflow à matérialiser votre classe PCollection intermédiaire. Envisagez d'utiliser l'une des opérations suivantes :

  • Vous pouvez insérer une opération GroupByKey et dissocier le groupe après votre première opération ParDo. Le service Dataflow ne fusionne jamais les opérations ParDo sur une agrégation.
  • Vous pouvez transmettre votre classe PCollection intermédiaire en tant qu'entrée secondaire à une autre opération ParDo. Le service Dataflow matérialise toujours les entrées secondaires.
  • Vous pouvez insérer une étape Reshuffle. Reshuffle empêche la fusion, contrôle les données et effectue la déduplication des enregistrements. Le rebrassage des données est pris en charge par Dataflow, même s'il est marqué comme obsolète dans la documentation d'Apache Beam.

Surveiller la fusion

Vous pouvez accéder à votre graphique optimisé et à vos étapes fusionnées en appelant project.locations.jobs.get ou en exécutant la commande gcloud suivante :

  gcloud dataflow jobs describe --full $JOB_ID --format json

Les étapes fusionnées sont décrites sous l'objet ExecutionStageSummary dans le tableau ComponentTransform du fichier de réponse de sortie. Vous pouvez rediriger la sortie vers jq pour extraire facilement les bits correspondants à l'aide de la commande gcloud suivante :

  gcloud dataflow jobs describe --full $JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage[] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Combiner l'optimisation

Les opérations d'agrégation forment un concept important dans le traitement de données à grande échelle. L'agrégation regroupe des données conceptuellement très éloignées, afin de les rendre extrêmement utiles pour la corrélation. Le modèle de programmation de Dataflow représente les opérations d'agrégation sous la forme des transformations GroupByKey, CoGroupByKey et Combine.

Les opérations d'agrégation de Dataflow combinent des données sur l'intégralité de l'ensemble de données, y compris des données pouvant être réparties sur plusieurs nœuds de calcul. Lors de telles opérations d'agrégation, il est souvent plus efficace de combiner le plus de données possible localement avant de combiner des données sur plusieurs instances. Lorsque vous appliquez une opération GroupByKey ou une autre transformation d'agrégation, le service Dataflow effectue automatiquement une combinaison locale partielle avant l'opération de regroupement principale.

Lors d'une combinaison partielle ou à plusieurs niveaux, le service Dataflow prend différentes décisions selon que votre pipeline utilise des données par lot ou par flux. Pour les données limitées, le service favorise l'efficacité et effectue autant que possible des combinaisons localement. Pour les données non limitées, le service favorise la faible latence et peut éviter d'effectuer des combinaisons partielles, qui risquent d'augmenter la latence.

Fonctions de réglage automatique

Le service Dataflow comprend plusieurs fonctionnalités de réglage automatique qui permettent d'optimiser encore plus dynamiquement votre tâche Dataflow pendant son exécution. Ces fonctionnalités incluent l'autoscaling horizontal, l'autoscaling vertical et le rééquilibrage dynamique des tâches.

Autoscaling horizontal

Lorsque l'autoscaling horizontal est activé, le service Dataflow choisit automatiquement le nombre approprié d'instances de nœuds de calcul requises pour exécuter votre tâche. Selon les caractéristiques de la tâche, le service Dataflow peut également réaffecter de façon dynamique plus ou moins de nœuds de calcul pendant l'exécution. Certaines parties de votre pipeline peuvent demander davantage de temps de calcul que d'autres, et le service Dataflow peut générer automatiquement des tâches supplémentaires au cours de ces phases de votre tâche (et les arrêter lorsqu'elles ne sont plus nécessaires).

Java

L'autoscaling horizontal est activé par défaut sur les tâches en flux continu qui utilisent Streaming Engine, ainsi que sur toutes les tâches par lot. Vous pouvez le désactiver en spécifiant explicitement l'option --autoscalingAlgorithm=NONE lorsque vous exécutez votre pipeline. Dans ce cas, notez que le service Dataflow définit le nombre de nœuds de calcul en fonction de l'option --numWorkers, dont la valeur par défaut est 3.

Lorsque l'autoscaling horizontal est activé, le service Dataflow ne permet pas à l'utilisateur de contrôler le nombre exact d'instances de nœuds de calcul allouées à votre tâche. Vous pouvez toujours limiter le nombre de nœuds de calcul en spécifiant l'option --maxNumWorkers lorsque vous exécutez votre pipeline.

Pour les tâches par lots, l'indicateur --maxNumWorkers est facultatif. La valeur par défaut est 1000. Pour les tâches en streaming utilisant Streaming Engine, l'indicateur --maxNumWorkers est facultatif. La valeur par défaut est 100. Pour les tâches en streaming qui n'utilisent pas Streaming Engine, l'indicateur --maxNumWorkers est obligatoire.

Python

L'autoscaling horizontal est activé par défaut sur toutes les tâches Dataflow de traitement par lot créées à l'aide du SDK Apache Beam pour Python version 0.5.1 ou ultérieure. Vous pouvez le désactiver en spécifiant explicitement l'option --autoscaling_algorithm=NONE lorsque vous exécutez votre pipeline. Dans ce cas, notez que le service Dataflow définit le nombre de nœuds de calcul en fonction de l'option --num_workers, dont la valeur par défaut est 3.

Lorsque l'autoscaling horizontal est activé, le service Dataflow ne vous permet pas de contrôler le nombre exact d'instances de nœuds de calcul allouées à votre tâche. Vous pouvez toujours limiter le nombre de nœuds de calcul en spécifiant l'option --max_num_workers lorsque vous exécutez votre pipeline.

Pour les tâches par lots, l'indicateur --max_num_workers est facultatif. La valeur par défaut est 1000. Pour les tâches en streaming utilisant Streaming Engine, l'indicateur --max_num_workers est facultatif. La valeur par défaut est 100. Pour les tâches en streaming qui n'utilisent pas Streaming Engine, l'indicateur --max_num_workers est obligatoire.

Dataflow effectue le scaling en fonction du parallélisme d'un pipeline. Le parallélisme d'un pipeline est une estimation du nombre de threads nécessaires pour traiter les données le plus efficacement possible à tout moment.

Autoscaling par lots

Pour les pipelines par lots, Dataflow choisit automatiquement le nombre de nœuds de calcul en fonction de l'estimation du volume total de travail pour chaque étape de votre pipeline, qui dépend de la taille d'entrée et du débit actuel. Dataflow réévalue le volume de travail en fonction de la progression toutes les 30 secondes, et augmente ou réduit de manière dynamique le nombre de nœuds de calcul à mesure que le nombre total de tâches augmente ou diminue.

Si l'une des conditions suivantes survient, Dataflow conserve ou réduit le nombre de nœuds de calcul pour économiser les ressources en cas d'inactivité :

  • L'utilisation moyenne du processeur du nœud de calcul est inférieure à 5 %.
  • Le parallélisme est limité en raison d'une tâche impossible à charger en parallèle, comme par exemple des données non partitionnées telles que des fichiers compressés ou des données traitées par des modules d'E/S non divisibles.
  • Le facteur de parallélisme est fixe, comme par exemple pour l'écriture dans des fichiers existants sur une destination Cloud Storage.
  • Si votre pipeline utilise une source de données personnalisée que vous avez mise en œuvre, plusieurs méthodes vous permettent de fournir davantage d'informations à l'algorithme d'autoscaling horizontal du service Dataflow afin d'améliorer les performances :

    Java

    • Dans votre sous-classe BoundedSource, mettez en œuvre la méthode getEstimatedSizeBytes. Le service Dataflow utilise getEstimatedSizeBytes lors du calcul du nombre initial de nœuds de calcul à utiliser pour votre pipeline.
    • Dans votre sous-classe BoundedReader, mettez en œuvre la méthode getFractionConsumed. Le service Dataflow utilise getFractionConsumed pour suivre la progression de la lecture et converger vers le nombre correct de nœuds de calcul à utiliser lors d'une lecture.

    Python

    • Dans votre sous-classe BoundedSource, mettez en œuvre la méthode estimate_size. Le service Dataflow utilise estimate_size lors du calcul du nombre initial de nœuds de calcul à utiliser pour votre pipeline.
    • Dans votre sous-classe RangeTracker, mettez en œuvre la méthode fraction_consumed. Le service Dataflow utilise fraction_consumed pour suivre la progression de la lecture et converger vers le nombre correct de nœuds de calcul à utiliser lors d'une lecture.

    Autoscaling de flux

    L'autoscaling de flux permet au service Dataflow de modifier de manière adaptative le nombre de nœuds de calcul utilisés pour exécuter votre pipeline de traitement par flux en réponse aux changements d'utilisation des ressources et de charge. L'autoscaling de flux est gratuit et conçu pour réduire le coût des ressources utilisées lors de l'exécution de pipelines de flux.

    L'autoscaling de flux détermine quand effectuer le scaling en surveillant le temps d'attente estimé. Le temps d'attente estimé est calculé à partir du débit et du traitement des octets en attente à traiter à partir de la source d'entrée. Un pipeline est considéré comme en attente lorsque le temps d'attente estimé est supérieur à 15 secondes.

  • Scaling à la hausse : si un pipeline de traitement par flux reste en retard et que les nœuds de calcul utilisent en moyenne plus de 20 % de leurs ressources de processeur, Dataflow effectue un scaling à la hausse pendant quelques minutes. Dataflow a pour objectif de rattraper le retard dans les 150 secondes environ après le scaling à la hausse, en fonction du débit actuel par nœud de calcul.
  • Scaling à la baisse : si le retard d'un pipeline de traitement par flux est inférieur à 10 secondes et que les nœuds de calcul utilisent en moyenne moins de 75 % de leurs ressources de processeur, Dataflow effectue un scaling à la baisse pendant quelques minutes. Après un scaling à la baisse, les nœuds de calcul utilisent en moyenne 75 % de leurs processeurs. Dans les tâches de streaming qui n'utilisent pas Streaming Engine, il est parfois impossible d'atteindre l'utilisation de 75% du processeur en raison de la distribution du disque (chaque nœud de calcul doit ont le même nombre de disques persistants) et une utilisation de processeur inférieure est utilisée. Par exemple, une tâche configurée pour utiliser un maximum de 100 nœuds de calcul (avec un disque par nœud de calcul) peut être réduite à 50 nœuds de calcul (avec 2 disques par nœud de calcul). Pour cette tâche, il n'est pas possible d'atteindre une utilisation du processeur de 75 %, car la prochaine mise à l'échelle de 100 nœuds de calcul serait de 50 nœuds, soit un nombre inférieur aux 75 nœuds de calcul requis. Par conséquent, Dataflow ne réduit pas cette tâche, ce qui entraîne une utilisation du processeur inférieure à 75 %.
  • Pas de scaling : s'il n'y a pas de tâches en attente mais que l'utilisation du processeur est supérieure ou égale à 75 %, le pipeline ne réduit pas le nombre d'instances. En cas de retard alors que l'utilisation du processeur est inférieure à 20 %, le pipeline ne fait pas non plus évoluer le scaling.
  • Scaling prédictif : le moteur Streaming Engine utilise également une technique d'autoscaling horizontal prédictif basée sur les timers en attente. Dans le modèle Dataflow, les données illimitées d'un pipeline de streaming sont divisées en fenêtres regroupées par horodatage. À la fin d'une fenêtre, des timers (minuteurs) se déclenchent pour chaque clé en cours de traitement dans cette fenêtre. Le déclenchement d'un timer indique que la fenêtre a expiré pour une clé donnée. Le moteur Streaming Engine est capable de quantifier les timers en attente, ce qui signifie qu'il peut prédire le nombre de minuteurs qui se déclencheront à la fin d'une fenêtre. L'utilisation des timers en attente en tant que signal permet à Dataflow de prévoir l'avenir en estimant le volume de traitement qui sera nécessaire lors du déclenchement des timers à venir. Dataflow s'adapte automatiquement et à l'avance suivant la charge future estimée afin de répondre à la demande prévue.
  • Sans autoscaling horizontal, vous devez choisir un nombre fixe de nœuds de calcul en spécifiant numWorkers ou num_workers pour exécuter le pipeline. La charge de travail d'entrée varie dans le temps. Ce nombre peut donc devenir trop élevé ou trop faible. Le provisionnement d'un trop grand nombre de nœuds de calcul entraîne des coûts supplémentaires inutiles, tandis que le provisionnement d'un trop petit nombre de nœuds de calcul entraîne une latence plus élevée pour les données traitées. Avec l'activation de l'autoscaling horizontal, les ressources ne sont utilisées que lorsqu'elles sont nécessaires.

    L'objectif de l'autoscaling horizontal des pipelines de traitement par flux est de minimiser les tâches en attente tout en maximisant l'utilisation et le débit des nœuds de calcul, ainsi que de réagir rapidement aux pics de charge. En activant l'autoscaling horizontal, vous n'avez pas à choisir entre le provisionnement pour les pics de charge ou des résultats actualisés. Les nœuds de calcul sont ajoutés à mesure que l'utilisation du processeur et le traitement en attente augmentent, ou supprimés au fur et à mesure que ces métriques diminuent. Ainsi, vous ne payez que pour ce dont vous avez besoin, et la tâche est traitée aussi efficacement que possible.

    Java

    Sources illimitées personnalisées

    Si votre pipeline utilise une source personnalisée sans limite, la source doit informer le service Dataflow des tâches en attente. Le traitement en attente est une estimation de l'entrée en octets qui n'a pas encore été traitée par la source. Pour informer le service du traitement en attente, mettez en œuvre l'une des deux méthodes suivantes dans votre classe UnboundedReader :

    • getSplitBacklogBytes() : traitement en attente pour la répartition actuelle de la source. Le service regroupe les traitements en attente dans toutes les répartitions.
    • getTotalBacklogBytes() : traitement en attente global sur toutes les répartitions. Dans certains cas, le traitement en attente n'est pas disponible pour chaque répartition et ne peut être calculé que sur toutes les répartitions. Seule la première répartition (ID de répartition "0") doit fournir le traitement en attente total.
    Le dépôt Apache Beam contient plusieurs exemples de sources personnalisées qui mettent en œuvre la classe UnboundedReader.
    Activer l'autoscaling de flux

    Pour les tâches de traitement par flux utilisant Streaming Engine, l'autoscaling horizontal est activé par défaut.

    Afin d'activer l'autoscaling horizontal pour les tâches qui n'utilisent pas Streaming Engine, définissez les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

    Pour les tâches de traitement par flux qui n'utilisent pas Streaming Engine, le nombre minimal de nœuds de calcul correspond à 1/15e de la valeur --maxNumWorkers (arrondi).

    Les pipelines de traitement par flux sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --maxNumWorkers. Tenez-en compte lorsque vous spécifiez --maxNumWorkers, et assurez-vous que cette valeur représente un nombre suffisant de disques pour votre pipeline.

    Utilisation et tarifs

    L'utilisation de Compute Engine est basée sur le nombre moyen de nœuds de calcul, tandis que l'utilisation des disques persistants est basée sur le nombre exact de --maxNumWorkers. Les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

    Dans l'exemple ci-dessus, où --maxNumWorkers=15, vous payez entre 1 et 15 instances de Compute Engine et exactement 15 disques persistants.

    Python

    Activer l'autoscaling de flux

    Pour activer l'autoscaling horizontal, définissez les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

    Pour les tâches de traitement par flux qui n'utilisent pas Streaming Engine, le nombre minimal de nœuds de calcul correspond à 1/15e de la valeur --maxNumWorkers (arrondi).

    Les pipelines de traitement par flux sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --maxNumWorkers. Tenez-en compte lorsque vous spécifiez --maxNumWorkers, et assurez-vous que cette valeur représente un nombre suffisant de disques pour votre pipeline.

    Utilisation et tarifs

    L'utilisation de Compute Engine est basée sur le nombre moyen de nœuds de calcul, tandis que l'utilisation des disques persistants est basée sur le nombre exact de --max_num_workers. Les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

    Dans l'exemple ci-dessus, où --max_num_workers=15, vous payez entre 1 et 15 instances de Compute Engine et exactement 15 disques persistants.

    Scaling manuel d'un pipeline de traitement par flux

    En attendant que l'autoscaling horizontal soit disponible globalement en mode de traitement par flux, vous pouvez utiliser une solution de contournement pour faire évoluer manuellement le nombre de nœuds de calcul exécutant votre pipeline de flux à l'aide de la fonctionnalité de mise à jour de Dataflow.

    Java

    Pour redimensionner votre pipeline de flux lors de l'exécution, veillez à définir les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

    • Définissez --maxNumWorkers sur le nombre maximal de nœuds de calcul que vous souhaitez affecter à votre pipeline.
    • Définissez --numWorkers sur le nombre initial de nœuds de calcul que votre pipeline doit utiliser lorsqu'il commence à s'exécuter.

    Une fois votre pipeline en cours d'exécution, vous pouvez le mettre à jour et spécifier un nouveau nombre de nœuds de calcul à l'aide du paramètre --numWorkers. La valeur que vous définissez pour le nouveau paramètre --numWorkers doit être comprise entre N et --maxNumWorkers, où N est égal à --maxNumWorkers / 15.

    La fonction de mise à jour remplacera votre tâche en cours d'exécution par une nouvelle tâche, en utilisant le nouveau nombre de nœuds de calcul, tout en préservant toutes les informations d'état associées à la tâche précédente.

    Python

    Pour redimensionner votre pipeline de flux lors de l'exécution, veillez à définir les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

    • Définissez --max_num_workers sur le nombre maximal de nœuds de calcul que vous souhaitez affecter à votre pipeline.
    • Définissez --num_workers sur le nombre initial de nœuds de calcul que votre pipeline doit utiliser lorsqu'il commence à s'exécuter.

    Une fois votre pipeline en cours d'exécution, vous pouvez le mettre à jour et spécifier un nouveau nombre de nœuds de calcul à l'aide du paramètre --num_workers. La valeur que vous définissez pour le nouveau paramètre --num_workers doit être comprise entre N et --max_num_workers, où N est égal à --max_num_workers / 15.

    La fonction de mise à jour remplacera votre tâche en cours d'exécution par une nouvelle tâche, en utilisant le nouveau nombre de nœuds de calcul, tout en préservant toutes les informations d'état associées à la tâche précédente.

    Autoscaling vertical pour les pipelines de streaming Python

    L'autoscaling vertical est une fonctionnalité qui permet à Dataflow Prime de procéder au scaling dynamique de la mémoire disponible pour les nœuds de calcul en fonction des exigences de la tâche. Cette fonctionnalité est conçue pour rendre les tâches résilientes aux erreurs de mémoire insuffisante et optimiser l'efficacité du pipeline. Dataflow Prime surveille votre pipeline, détecte les situations où les nœuds de calcul manquent ou dépassent la mémoire disponible, puis les remplace par de nouveaux nœuds de calcul disposant de plus ou de moins de mémoire.

    L'autoscaling vertical est activé lorsque vous activez Dataflow Prime.

    Limites de l'aperçu

    • Seules les tâches Python en streaming peuvent faire l'objet d'un scaling vertical.
    • Seule la mémoire des VM des nœuds de calcul peut être mise à l'échelle verticalement.
    • L'augmentation de la mémoire est limitée à 8 Gio maximum pour chaque processeur virtuel. Lorsque vous utilisez des GPU, l'augmentation de la mémoire est limité à 6,5 Gio maximum pour chaque processeur virtuel.
    • La réduction de la mémoire est limitée à 3 Gio minimum pour chaque processeur virtuel.

    Surveiller l'autoscaling vertical

    Les opérations d'autoscaling vertical sont publiées dans les journaux des tâches et des nœuds de calcul. Pour afficher ces journaux, consultez la page Utiliser l'interface de surveillance Dataflow.

    Effet sur l'autoscaling horizontal

    Dans Dataflow Prime, l'autoscaling vertical fonctionne avec l'autoscaling horizontal. Cette combinaison permet à Dataflow Prime d'adapter facilement les nœuds de calcul en fonction des besoins de votre pipeline et de maximiser l'utilisation de la capacité de calcul.

    De par sa conception, l'autoscaling vertical (qui ajuste la mémoire de la VM) se produit à une fréquence plus faible que l'autoscaling horizontal (qui ajuste le nombre de VM). L'autoscaling horizontal est désactivé pendant et jusqu'à 10 minutes après le déclenchement d'une mise à jour par l'autoscaling vertical. S'il existe un nombre important de données d'entrée en attente après ce délai de 10 minutes, l'autoscaling horizontal risque de se produire. Pour en savoir plus sur l'autoscaling horizontal pour les pipelines de traitement par flux, consultez la page Autoscaling des flux.

    Dépannage

    Cette section fournit des instructions permettant de résoudre des problèmes courants liés à l'autoscaling vertical.

    L'autoscaling vertical ne semble pas fonctionner. Que dois-je vérifier ?
    • Vérifiez que votre tâche est une tâche de streaming Python. L'autoscaling vertical n'est pas disponible pour les autres types de tâches.
    • Vérifiez que vous avez activé l'API Cloud Autoscaling pour votre projet Google Cloud.
    • Vérifiez que votre tâche exécute Dataflow Prime. Pour en savoir plus, consultez la page Activer Dataflow Prime.
    La tâche observe un nombre élevé de filigranes et de tâches en attente. Que dois-je vérifier ?

    Si le remodelage vertical des nœuds de calcul prend plus de quelques minutes, votre tâche peut présenter un nombre important de données d'entrée et un filigrane élevé. Pour résoudre ce problème, nous vous recommandons vivement d'utiliser des conteneurs personnalisés, car ils peuvent améliorer la latence pouvant résulter du remodelage des nœuds de calcul. Si le problème persiste après l'utilisation de conteneurs personnalisés, contactez le service client.

    Rééquilibrage dynamique des tâches

    La fonctionnalité de rééquilibrage dynamique des tâches du service Dataflow permet au service de repartitionner le travail de manière dynamique en fonction des conditions d'exécution. Ces conditions peuvent être les suivantes :

    • Déséquilibres dans les attributions de tâches
    • Nœuds de calcul nécessitant plus de temps que prévu pour terminer
    • Nœuds de calcul nécessitant moins de temps que prévu pour terminer

    Le service Dataflow détecte automatiquement ces conditions et peut réattribuer le travail de manière dynamique à des nœuds de calcul inutilisés ou sous-utilisés afin de réduire le temps de traitement global de votre tâche.

    Limites

    Le rééquilibrage dynamique du travail a lieu uniquement lorsque le service Dataflow traite certaines données d'entrée en parallèle : lors de la lecture de données à partir d'une source d'entrée externe, lors de l'utilisation d'une classe PCollection intermédiaire matérialisée, ou lors de l'utilisation du résultat d'une agrégation, comme GroupByKey. Si de nombreuses étapes de votre tâche sont fusionnées, votre tâche contient moins de classes PCollection intermédiaires, et le rééquilibrage dynamique du travail est limité au nombre d'éléments dans la classe PCollection matérialisée source. Si vous voulez vous assurer que le rééquilibrage dynamique du travail peut être appliqué à une classe PCollection particulière de votre pipeline, vous pouvez empêcher la fusion de différentes manières afin de garantir un parallélisme dynamique.

    Le rééquilibrage dynamique du travail ne peut pas remettre en parallèle des données plus fines qu'un seul enregistrement. Si vos données contiennent des enregistrements individuels qui entraînent des temps de traitement importants, elles peuvent retarder votre tâche, car Dataflow ne peut pas subdiviser et redistribuer un enregistrement "à chaud" individuel sur plusieurs nœuds de calcul.

    Java

    Si vous avez défini un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de TextIO.Write.withNumShards), la parallélisation est limitée en fonction du nombre de segments que vous avez choisi.

    Python

    Si vous avez défini un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de beam.io.WriteToText(..., num_shards=...)), Dataflow limitera la parallélisation en fonction du nombre de segments que vous avez choisi.

    La limitation des segments fixes peut être considérée comme temporaire et peut être modifiée dans les futures versions du service Dataflow.

    Travailler avec des sources de données personnalisées

    Java

    Si votre pipeline utilise une source de données personnalisée que vous fournissez, vous devez mettre en œuvre la méthode splitAtFraction pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

    Python

    Si votre pipeline utilise une source de données personnalisée que vous fournissez, votre RangeTracker doit mettre en œuvre try_claim, try_split, position_at_fraction et fraction_consumed pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

    Consultez les informations de référence de l'API sur RangeTracker pour en savoir plus.

    Utiliser et gérer des ressources

    Le service Dataflow gère entièrement les ressources dans Google Cloud, tâche par tâche. Cela inclut le démarrage et l'arrêt d'instances Compute Engine (parfois appelées nœuds de calcul ou VM), ainsi que l'accès aux buckets Cloud Storage de votre projet pour la préparation de fichiers d'E/S et temporaires. Toutefois, si votre pipeline interagit avec des technologies de stockage de données Google Cloud telles que BigQuery ou Pub/Sub, vous devez gérer les ressources et les quotas de ces services.

    Dataflow utilise un emplacement fourni par l'utilisateur dans Cloud Storage spécifiquement pour la préparation des fichiers. Cet emplacement est sous votre contrôle, et vous devez vous assurer que sa durée de vie est maintenue aussi longtemps que vos tâches lisent les données qu'il contient. Vous pouvez réutiliser un même emplacement de préparation pour plusieurs exécutions de tâches, car la mise en cache intégrée du SDK peut accélérer le démarrage de vos tâches.

    Tâches

    Vous pouvez exécuter jusqu'à 25 tâches Dataflow simultanées par projet Google Cloud. Toutefois, cette limite peut être augmentée en contactant l'assistance Google Cloud. Pour en savoir plus, consultez la page consacrée aux quotas.

    Le service Dataflow est actuellement limité au traitement des requêtes de tâches JSON d'une taille inférieure ou égale à 20 Mo. La taille de la requête de tâche est spécifiquement liée à la représentation JSON de votre pipeline. Plus le pipeline est grand, plus la requête est importante.

    Pour estimer la taille de la requête JSON de votre pipeline, exécutez-le avec l'option suivante :

    Java

    --dataflowJobFile=< path to output file >
    

    Python

    --dataflow_job_file=< path to output file >
    

    Cette commande écrit une représentation JSON de votre tâche dans un fichier. La taille du fichier sérialisé représente une estimation fiable de la taille de la requête. La taille réelle est toutefois légèrement plus importante en raison de la présence d'informations supplémentaires incluses dans la requête.

    Pour plus d'informations, consultez la page de dépannage sur l'erreur 413 Corps de requête trop long / La taille de la représentation JSON sérialisée du pipeline dépasse la limite autorisée".

    De plus, la taille du graphique de votre tâche ne doit pas dépasser 10 Mo. Pour plus d'informations, consultez la page de dépannage concernant l'erreur "Le graphique de la tâche est trop grand. Veuillez réessayer avec un graphique de tâche plus petit, ou diviser votre tâche en deux plus petites tâches (ou plus)".

    Nœuds de calcul

    Le service Dataflow autorise actuellement un maximum de 1 000 instances Compute Engine par tâche. Pour les tâches par lots, le type de machine par défaut est n1-standard-1. Pour les tâches en streaming, le type de machine par défaut pour les tâches compatibles Streaming Engine est n1-standard-2 et le type de machine par défaut pour les tâches non Streaming Engine est n1-standard-4. Lorsque vous utilisez les types de machines par défaut, le service Dataflow peut allouer jusqu'à 4 000 cœurs par tâche. Si vous avez besoin de davantage de cœurs pour votre tâche, vous pouvez sélectionner un type de machine plus grand.

    Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés. Pour des résultats optimaux, utilisez des types de machine n1. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas pris en charge dans le cadre du contrat de niveau de service de Dataflow.

    L'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire dans les nœuds de calcul. La facturation ne dépend pas de la famille de types de machines. Vous pouvez spécifier un type de machine pour votre pipeline en définissant le paramètre d'exécution approprié au moment de la création du pipeline.

    Java

    Pour modifier le type de machine, définissez l'option --workerMachineType.

    Python

    Pour modifier le type de machine, définissez l'option --worker_machine_type.

    Quota de ressources

    Le service Dataflow vérifie que votre projet Google Cloud dispose du quota de ressources Compute Engine requis pour exécuter votre tâche, à la fois pour démarrer la tâche et pour la faire évoluer vers le nombre maximal d'instances de nœuds de calcul. Votre tâche ne peut pas démarrer si le quota de ressources disponible est insuffisant.

    La fonctionnalité d'autoscaling horizontal de Dataflow est limitée par le quota Compute Engine disponible de votre projet. Si votre tâche possède un quota suffisant au démarrage, mais qu'une autre tâche utilise le reste du quota disponible de votre projet, la première tâche est exécutée, mais ne peut pas être entièrement mise à l'échelle.

    Cependant, le service Dataflow ne gère pas les augmentations de quota pour les tâches dépassant les quotas de ressources de votre projet. Il vous incombe d'effectuer toutes les demandes de quotas de ressources supplémentaires nécessaires. Pour cela, vous pouvez utiliser Google Cloud Console.

    Adresses IP

    Par défaut, Cloud Dataflow attribue des adresses IP publiques et privées aux VM de nœud de calcul. Une adresse IP publique répond à l'un des critères d'accès à Internet, mais elle est également comptabilisée dans votre quota d'adresses IP externes.

    Si vos VM de nœud de calcul n'ont pas besoin d'accéder à l'Internet public, envisagez d'utiliser uniquement des adresses IP internes, qui ne sont pas comptabilisées dans votre quota d'adresses externes. Pour en savoir plus sur la configuration des adresses IP, consultez les ressources suivantes :

    Ressources de disques persistants

    Le service Dataflow est actuellement limité à 15 disques persistants par instance de nœud de calcul lors de l'exécution d'une tâche de traitement par flux. Chaque disque persistant est local sur une machine virtuelle Compute Engine individuelle. Votre tâche ne peut pas comporter plus de nœuds de calcul que de disques persistants. L'allocation minimale de ressources est un ratio de 1:1 entre les nœuds de calcul et les disques.

    Les tâches utilisant Streaming Engine nécessitent des disques de démarrage de 30 Go. Les tâches qui utilisent Dataflow Shuffle nécessitent des disques de démarrage de 25 Go. Pour les tâches qui n'utilisent pas ces offres, la taille par défaut de chaque disque persistant est de 250 Go en mode de traitement par lot et de 400 Go en mode de traitement par flux.

    Zones

    Par défaut, le service Dataflow déploie les ressources Compute Engine dans la zone us-central1-f de la région us-central1. Vous pouvez ignorer ce comportement en spécifiant le paramètre --region. Si vous devez utiliser une zone spécifique pour vos ressources, utilisez le paramètre --zone lors de la création de votre pipeline. Cependant, nous vous recommandons de ne spécifier que la région, mais pas la zone. Cela permet au service Dataflow de sélectionner automatiquement la meilleure zone de la région en fonction de la capacité disponible de la zone au moment de la demande de création de tâche. Pour en savoir plus, consultez la documentation relative aux points de terminaison régionaux.

    Streaming Engine

    Actuellement, l'exécuteur de pipeline Dataflow exécute entièrement les étapes de votre pipeline de traitement par flux sur des machines virtuelles de nœud de calcul et consomme les ressources de processeur, la mémoire et le stockage de disque persistant des nœuds de calcul. La fonctionnalité Streaming Engine de Dataflow transfère l'exécution du pipeline depuis les VM de nœud de calcul vers le backend du service Dataflow.

    Avantages de Streaming Engine

    Le modèle Streaming Engine présente les avantages suivants :

    • Réduction des ressources de processeur, de mémoire et de disques persistants consommées sur les machines virtuelles de calcul. Streaming Engine fonctionne mieux avec des types de machines plus petits (n1-standard-2 plutôt que n1-standard-4) et ne nécessite pas de disque persistant en dehors d'un petit disque de démarrage pour le nœud de calcul, ce qui réduit la consommation de ressources et de quotas.
    • Autoscaling horizontal plus réactif en réponse aux variations du volume de données entrantes. Streaming Engine offre un scaling plus fluide et plus précis des nœuds de calcul.
    • Meilleure compatibilité : vous n'avez pas besoin de redéployer vos pipelines pour appliquer les mises à jour du service.

    La majeure partie de la réduction des ressources de calcul est due au déchargement du travail sur le service Dataflow. Pour cette raison, des frais sont appliqués à l'utilisation de Streaming Engine. Toutefois, le montant total facturé pour des pipelines Dataflow utilisant Streaming Engine devrait rester sensiblement identique au coût des pipelines Dataflow qui n'utilisent pas cette option.

    Utiliser Streaming Engine

    Cette fonctionnalité est disponible dans toutes les régions où Dataflow est disponible. Pour afficher les emplacements disponibles, consultez la page Emplacements Dataflow.

    Java

    Pour utiliser Streaming Engine pour vos pipelines de streaming, spécifiez le paramètre suivant :

    • --enableStreamingEngine si vous utilisez le SDK Apache Beam pour Java en version 2.11.0 ou ultérieure.
    • --experiments=enable_streaming_engine si vous utilisez le SDK Apache Beam pour Java en version 2.10.0.

    Si vous utilisez Dataflow Streaming Engine pour votre pipeline, ne spécifiez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Streaming Engine est actuellement disponible. Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Dataflow signale une erreur.

    Streaming Engine fonctionne mieux avec des types de machines plus petits pour les nœuds de calcul. Nous vous recommandons donc de définir --workerMachineType=n1-standard-2. Vous pouvez également définir --diskSizeGb=30, car Streaming Engine n'a besoin d'espace que pour l'image de démarrage du nœud de calcul et les journaux locaux. Ces valeurs constituent les valeurs par défaut.

    Python

    Streaming Engine est activé par défaut pour les nouveaux pipelines de traitement par flux Dataflow lorsque les conditions suivantes sont remplies :

    Si vous souhaitez désactiver Streaming Engine dans votre pipeline de traitement par flux Python, spécifiez le paramètre suivant :

    --experiments=disable_streaming_engine

    Si vous utilisez Python 2, vous devez toujours activer Streaming Engine en spécifiant le paramètre suivant :

    --enable_streaming_engine

    Si vous utilisez Dataflow Streaming Engine dans votre pipeline, ne spécifiez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Streaming Engine est actuellement disponible. Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Dataflow signale une erreur.

    Streaming Engine fonctionne mieux avec des types de machines plus petits pour les nœuds de calcul. Nous vous recommandons donc de définir --machine_type=n1-standard-2. Vous pouvez également définir --disk_size_gb=30, car Streaming Engine n'a besoin d'espace que pour l'image de démarrage du nœud de calcul et les journaux locaux. Ces valeurs constituent les valeurs par défaut.

    Dataflow Shuffle

    Dataflow Shuffle est l'opération de base derrière les transformations Dataflow telles que GroupByKey, CoGroupByKey et Combine. L'opération Dataflow Shuffle permet de partitionner et de regrouper les données par clé de manière évolutive et efficace tout en bénéficiant d'une tolérance aux pannes. Actuellement, la fonctionnalité de brassage des données dans Dataflow s'exécute entièrement à l'aide de machines virtuelles de nœud de calcul et consomme des ressources telles qu'un processeur de nœud de calcul, de la mémoire et de l'espace de stockage sur disque persistant. La fonctionnalité Dataflow Shuffle basée sur les services, disponible uniquement pour les pipelines de traitement par lot, transfère l'opération de brassage des données depuis les VM de nœud de calcul vers le backend du service Dataflow.

    Les tâches par lot utilisent Dataflow Shuffle par défaut.

    Avantages de Dataflow Shuffle

    La fonctionnalité Dataflow Shuffle basée sur les services présente les avantages suivants :

    • Temps d'exécution plus rapide des pipelines par lots pour la majorité des types de tâches de pipeline.
    • Réduction des ressources de processeur, de mémoire et de disques persistants consommées sur les machines virtuelles de calcul.
    • Meilleur autoscaling horizontal : les VM ne contiennent plus de données aléatoires et peuvent donc faire l'objet d'un scaling à la baisse plus tôt.
    • Meilleure tolérance aux pannes : une machine virtuelle défectueuse contenant des données Dataflow Shuffle n'entraîne pas l'échec de l'ensemble de la tâche, comme cela se produirait si la fonctionnalité n'était pas utilisée.

    La majeure partie de la réduction des ressources de calcul est due au déchargement du travail de brassage des données sur le service Dataflow. Pour cette raison, des frais sont appliqués à l'utilisation de Dataflow Shuffle. Toutefois, le montant total facturé pour des pipelines Dataflow utilisant la mise en œuvre Dataflow basée sur les services devrait être inférieur ou égal au coût des pipelines Dataflow qui n'utilisent pas cette option.

    Pour la majorité des types de tâches de pipeline, Dataflow Shuffle doit s'exécuter plus rapidement que la fonctionnalité de brassage des données qui s'exécute sur les VM de nœud de calcul. Cependant, les temps d'exécution peuvent varier d'une exécution à l'autre. Si vous exécutez un pipeline avec des délais importants, nous vous recommandons d'allouer suffisamment de temps de mémoire tampon avant l'échéance. En outre, envisagez de demander un quota plus important pour Shuffle.

    Exceptions liées à l'utilisation de Dataflow Shuffle

    Les tâches par lot utilisent Dataflow Shuffle par défaut. La taille du disque de démarrage pour chaque tâche par lot est réduite à 25 Go au lieu de la taille par défaut de 250 Go. Pour certaines tâches par lots, vous devrez peut-être désactiver Dataflow Shuffle ou modifier la taille du disque. Réfléchissez aux éléments suivants :

    • Une machine virtuelle de calcul utilise une partie des 25 Go d'espace disque pour le système d'exploitation, les fichiers binaires, les journaux et les conteneurs. Les tâches qui utilisent une quantité importante de disque et qui dépassent la capacité de disque restante peuvent échouer lorsque vous utilisez Dataflow Shuffle.
    • Les tâches qui utilisent beaucoup d'E/S de disque peuvent être lentes en raison des performances du petit disque. Pour plus d'informations sur les différences de performances entre les différentes tailles de disque, consultez la page Performances des disques persistants Compute Engine.

    Pour spécifier une taille de disque plus importante pour une tâche Dataflow Shuffle, vous pouvez utiliser le paramètre --disk_size_gb.

    Consultez la section suivante pour désactiver Dataflow Shuffle.

    Utiliser Dataflow Shuffle

    Cette fonctionnalité est disponible dans toutes les régions où Dataflow est disponible. Pour afficher les emplacements disponibles, consultez la page Emplacements Dataflow. Si vous utilisez Dataflow Shuffle, les nœuds de calcul doivent également être déployés dans la même région que le point de terminaison régional.

    Java

    Les tâches par lot utilisent Dataflow Shuffle par défaut. Pour désactiver l'utilisation de Dataflow Shuffle, spécifiez l'option de pipeline suivante :
    --experiments=shuffle_mode=appliance.

    Si vous utilisez Dataflow Shuffle pour votre pipeline, ne spécifiez pas les options de pipeline zone. Au lieu de cela, spécifiez le paramètre region et définissez sa valeur sur l'une des régions dans lesquelles Shuffle est actuellement disponible. Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez l'option de pipeline zone en la définissant sur une zone en dehors des régions disponibles, Dataflow signale une erreur. Si vous définissez une combinaison de region et de zoneincompatibles, votre tâche ne peut pas utiliser Dataflow Shuffle.

    Python

    Les tâches par lot utilisent Dataflow Shuffle par défaut. Pour désactiver l'utilisation de Dataflow Shuffle, spécifiez l'option de pipeline suivante :
    --experiments=shuffle_mode=appliance.

    Si vous utilisez Dataflow Shuffle pour votre pipeline, ne spécifiez pas les options de pipeline zone. Au lieu de cela, spécifiez le paramètre region et définissez sa valeur sur l'une des régions dans lesquelles Shuffle est actuellement disponible. Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez l'option de pipeline zone en la définissant sur une zone en dehors des régions disponibles, Dataflow signale une erreur. Si vous définissez une combinaison de region et de zoneincompatibles, votre tâche ne peut pas utiliser Dataflow Shuffle.

    Planification flexible des ressources dans Dataflow

    La fonctionnalité FlexRS de Dataflow réduit le coût du traitement par lot en s'appuyant sur des techniques de planification avancées, le service Dataflow Shuffle, et une combinaison d'instances de VM préemptives et de VM standards. Grâce à cette exécution combinée de VM préemptives et de VM standards en parallèle, Dataflow améliore l'expérience utilisateur lorsque Compute Engine arrête des instances de VM préemptives lors d'un événement système. FlexRS contribue à garantir que le pipeline continue de progresser et que vous ne perdez pas le travail déjà réalisé lorsque Compute Engine préempte vos VM préemptives. Pour plus d'informations sur FlexRS, consultez la page Utiliser la planification flexible des ressources dans Dataflow.

    Exécuteur Dataflow v2

    L'exécuteur Dataflow de production actuel utilise des nœuds de calcul spécifiques au langage lors de l'exécution de pipelines Apache Beam. Pour améliorer l'évolutivité, la généralisation, l'extensibilité et l'efficacité, l'exécuteur Dataflow passe à une architecture davantage basée sur les services. Ces modifications incluent une architecture de nœuds de calcul plus efficace et portable empaquetée avec le service Shuffle et Streaming Engine.

    L'exécuteur Dataflow v2 est compatible avec les configurations suivantes :

    Java

    L'exécuteur Dataflow v2 nécessite le SDK Apache Beam pour Java en version 2.30.0 ou ultérieure.

    Pour activer l'exécuteur v2, exécutez votre tâche avec l'option suivante : --experiments=use_runner_v2.

    Python

    L'exécuteur Dataflow v2 nécessite le SDK Apache Beam version 2.21.0 ou ultérieure pour Python. L'exécuteur v2 est la version par défaut pour les pipelines de traitement par lot Python (version 2.21.0 ou ultérieure) envoyés à compter du 1er février 2021 et pour les pipelines de traitement par flux Python (version 2.21.0 ou ultérieure). Vous n'avez pas besoin de modifier le code de votre pipeline pour exploiter cette nouvelle architecture.

    Dans certaines circonstances, votre pipeline peut ne pas utiliser l'exécuteur V2, bien qu'il s'exécute sur une version du SDK compatible. Il peut s'agir, par exemple, de tâches de modèle Dataflow ou de tâches où l'option "Grouper par clé" est utilisée comme entrée secondaire. Dans ce cas, vous pouvez choisir d'exécuter la tâche avec l'indicateur --experiments=use_runner_v2.

    Avantages de l'utilisation de l'exécuteur Dataflow v2

    Les nouvelles fonctionnalités ne seront disponibles que dans l'application Dataflow Runner v2. En outre, le renforcement de l'efficacité de l'architecture de l'exécuteur Dataflow v2 peut améliorer les performances de vos tâches Dataflow.

    En utilisant l'exécuteur Dataflow v2, vous remarquerez peut-être une réduction de votre facture.

    L'exécuteur Dataflow v2 vous permet également de précompiler votre conteneur Python, ce qui peut améliorer le temps de démarrage des VM et les performances de l'autoscaling. Pour essayer cette fonctionnalité expérimentale, activez l'API Cloud Build sur votre projet et envoyez le pipeline avec le paramètre suivant :
    --prebuild_sdk_container_engine=cloud_build.

    Une fois la tâche terminée ou arrêtée, vous pouvez supprimer l'image prédéfinie de Container Registry. L'URL de l'image se trouve sur l'interface utilisateur de surveillance de Dataflow, dans Options de pipeline.

    L'exécuteur Dataflow v2 est compatible avec les pipelines multilingues, une fonctionnalité qui permet à votre pipeline Apache Beam d'utiliser des transformations définies dans d'autres SDK Apache Beam. Actuellement, l'exécuteur Dataflow v2 permet d'utiliser des transformations Java à partir d'un pipeline du SDK Python (version Bêta).

    Utiliser l'exécuteur Dataflow v2

    L'exécuteur Dataflow v2 est disponible dans les régions disposant de points de terminaison régionaux Dataflow. Durant cette phase de déploiement, si vous souhaitez essayer Dataflow Runner v2, vous pouvez utiliser le paramètre suivant :
    --experiments=use_runner_v2

    Déboguer les tâches de l'exécuteur Dataflow v2

    Pour déboguer des tâches utilisant l'exécuteur Dataflow v2, vous devez suivre les étapes de débogage standards. Toutefois, tenez compte des points suivants lorsque vous utilisez l'exécuteur Dataflow v2 :

    • Les tâches de l'exécuteur Dataflow v2 exécutent deux types de processus sur la VM de nœud de calcul : le processus du SDK et le processus de test de l'exécuteur. Selon le pipeline et le type de VM, il peut y avoir un ou plusieurs processus du SDK, mais il n'y a qu'un seul processus de test de l'exécuteur par VM.
    • Les processus du SDK exécutent le code utilisateur et d'autres fonctions spécifiques au langage, tandis que le processus de test de l'exécuteur gère tout le reste.
    • Le processus de test de l'exécuteur attend que tous les processus du SDK s'y connectent avant de commencer à demander du travail à partir de Dataflow.
    • Les tâches peuvent être retardées si la VM de nœud de calcul télécharge et installe des dépendances au démarrage des processus du SDK. En cas de problème dans un processus du SDK, tel que le démarrage ou l'installation de bibliothèques, le nœud de calcul signale son état comme non opérationnel. Si les temps de démarrage augmentent, activez l'API Cloud Build sur votre projet et envoyez le pipeline avec le paramètre suivant :
      --prebuild_sdk_container_engine=cloud_build.
    • Les journaux de la VM du nœud de calcul, disponibles via l'explorateur de journaux ou l'interface de surveillance de Dataflow, incluent les journaux du processus d'exploitation d'exécuteur ainsi que les journaux des processus SDK.
    • Pour diagnostiquer les problèmes dans votre code utilisateur, examinez les journaux de nœud de calcul des processus du SDK. Si vous trouvez des erreurs dans les journaux de test de l'exécuteur, veuillez contacter l'assistance pour signaler un bug.

    VM protégée par Dataflow

    Vous pouvez indiquer au service Dataflow d'utiliser des nœuds de calcul pour les VM protégées. Pour en savoir plus sur les fonctionnalités de VM protégées, consultez la page VM protégée.

    Java

    Pour utiliser les nœuds de calcul de VM protégée, spécifiez le paramètre --dataflowServiceOptions=enable_secure_boot.

    Python

    Pour utiliser les nœuds de calcul de VM protégée, spécifiez le paramètre --dataflow_service_options=enable_secure_boot.