Déployer un pipeline

Après avoir construit et testé votre pipeline Apache Beam, vous pouvez utiliser le service géré Cloud Dataflow pour le déployer et l'exécuter. Une fois sur le service Cloud Dataflow, votre code de pipeline devient une tâche Cloud Dataflow.

Le service Cloud Dataflow gère entièrement les services GCP (Google Cloud Platform), tels que Compute Engine et Cloud Storage, pour exécuter votre tâche Cloud Dataflow, en créant et en supprimant automatiquement les ressources nécessaires. Il vous offre une grande visibilité sur votre tâche grâce à des outils comme l'interface de surveillance de Cloud Dataflow et l'interface de ligne de commande de Cloud Dataflow.

Vous pouvez contrôler certains aspects de la manière dont Cloud Dataflow exécute votre tâche en configurant des paramètres d'exécution dans votre code de pipeline. Par exemple, les paramètres d'exécution spécifient si les étapes de votre pipeline doivent être exécutées sur des machines virtuelles de calcul, sur le backend du service Cloud Dataflow, ou en local.

Outre la gestion des ressources GCP, le service Cloud Dataflow exécute et optimise automatiquement de nombreux aspects du traitement parallèle distribué, y compris :

  • Parallélisation et distribution. Cloud Dataflow partitionne automatiquement vos données et distribue votre code de calcul aux instances Compute Engine pour un traitement parallèle.
  • Optimisation. Cloud Dataflow utilise votre code de pipeline pour créer un graphique d'exécution représentant les classes PCollection et les transformations de votre pipeline, et améliore le graphique afin d'optimiser les performances et l'utilisation des ressources. Le service optimise également automatiquement les opérations potentiellement coûteuses, telles que les agrégations de données.
  • Fonctions de réglage automatique. Le service Cloud Dataflow comprend plusieurs fonctionnalités qui permettent un ajustement à la volée de l'allocation des ressources et du partitionnement des données, telles que l'autoscaling et le rééquilibrage dynamique du travail. Ces fonctionnalités aident le service Cloud Dataflow à exécuter votre tâche aussi rapidement et aussi efficacement que possible.

Cycle de vie du pipeline : du code de pipeline à la tâche Cloud Dataflow

Lorsque vous exécutez votre programme Cloud Dataflow, le service crée à partir du code qui construit votre objet Pipeline un graphique d'exécution comprenant toutes les transformations et leurs fonctions de traitement associées (par exemple, DoFn). Cette phase s'appelle le temps de construction du graphique. Au cours de la construction du graphique, Cloud Dataflow recherche diverses erreurs et s'assure que le graphique de votre pipeline ne contient aucune opération non conforme. Le graphique d'exécution est converti au format JSON, puis il est transmis au point de terminaison du service Cloud Dataflow.

Remarque : La construction du graphique se produit également lorsque vous exécutez votre pipeline localement. Cependant, le graphique n'est pas converti au format JSON ni transmis au service. Au lieu de cela, il est exécuté localement sur la machine sur laquelle vous avez lancé votre programme Cloud Dataflow. Consultez la documentation sur la configuration de l'exécution locale pour en savoir plus.

Le service Cloud Dataflow valide ensuite le graphique d'exécution JSON. Une fois le graphique validé, il devient une tâche sur le service Cloud Dataflow. Vous pouvez consulter votre tâche, son graphique d'exécution, son statut et ses informations de journalisation à l'aide de l'interface de surveillance de Cloud Dataflow.

Java : SDK 2.x

Le service Cloud Dataflow envoie une réponse à la machine sur laquelle vous avez exécuté votre programme Cloud Dataflow. Cette réponse est encapsulée dans l'objet DataflowPipelineJob, qui contient le paramètre jobId de votre tâche Cloud Dataflow. Pour surveiller votre tâche, la suivre et résoudre les problèmes la concernant, vous pouvez utiliser le paramètre jobId dans l'interface de surveillance de Cloud Dataflow et l'interface de ligne de commande de Cloud Dataflow. Consultez la documentation de référence de l'API pour DataflowPipelineJob pour en savoir plus.

Python

Le service Cloud Dataflow envoie une réponse à la machine sur laquelle vous avez exécuté votre programme Cloud Dataflow. Cette réponse est encapsulée dans l'objet DataflowPipelineResult, qui contient le paramètre job_id de votre tâche Cloud Dataflow. Pour surveiller votre tâche, la suivre et résoudre les problèmes la concernant, vous pouvez utiliser le paramètre job_id dans l'interface de surveillance de Cloud Dataflow et l'interface de ligne de commande de Cloud Dataflow.

Java : SDK 1.x

Le service Cloud Dataflow envoie une réponse à la machine sur laquelle vous avez exécuté votre programme Cloud Dataflow. Cette réponse est encapsulée dans l'objet DataflowPipelineJob, qui contient le paramètre jobId de votre tâche Cloud Dataflow. Pour surveiller votre tâche, la suivre et résoudre les problèmes la concernant, vous pouvez utiliser le paramètre jobId dans l'interface de surveillance de Cloud Dataflow et l'interface de ligne de commande de Cloud Dataflow. Consultez la documentation de référence de l'API pour DataflowPipelineJob pour en savoir plus.

Graphique d'exécution

Cloud Dataflow crée un graphique d'étapes représentant votre pipeline, en fonction des transformations et des données que vous avez utilisées lors de la construction de votre objet Pipeline. Il s'agit du graphique d'exécution du pipeline.

L'exemple WordCount, inclus dans les SDK Apache Beam, contient une série de transformations permettant de lire, d'extraire, de compter, de formater et d'écrire les mots individuels d'une collection de texte, ainsi que le nombre d'occurrences de chaque mot. Le diagramme suivant montre comment les transformations dans le pipeline WordCount sont développées dans un graphique d'exécution :

Les transformations dans l'exemple de programme WordCount ont été développées dans un graphique d'exécution des étapes que le service Cloud Dataflow doit exécuter.
Figure 1 : Exemple de graphique d'exécution WordCount

Le graphique d'exécution diffère souvent de l'ordre dans lequel vous avez spécifié vos transformations lors de la construction du pipeline. En effet, le service Cloud Dataflow effectue diverses optimisations et fusions sur le graphique d'exécution avant de s'exécuter sur des ressources de cloud gérées. Le service Cloud Dataflow respecte les dépendances de données lors de l'exécution de votre pipeline. Cependant, les étapes sans dépendance de données entre elles peuvent être exécutées dans n'importe quel ordre.

Vous pouvez consulter le graphique d'exécution non optimisé généré par Cloud Dataflow pour votre pipeline lorsque vous sélectionnez votre tâche dans l'interface de surveillance de Cloud Dataflow.

Parallélisation et distribution

Le service Cloud Dataflow parallélise la logique de traitement de votre pipeline et la distribue automatiquement aux utilisateurs auxquels vous avez attribué votre tâche. Cloud Dataflow utilise les abstractions du modèle de programmation pour représenter les fonctions de parallélisation. Par exemple, vos transformations ParDo forcent Cloud Dataflow à distribuer automatiquement votre code de traitement (représenté par des fonctions DoFn) à plusieurs nœuds de calcul pour qu'il s'exécute en parallèle.

Structurer votre code d'utilisateur

Vous pouvez considérer votre code DoFn comme de petites entités indépendantes : de nombreuses instances peuvent potentiellement être exécutées sur des machines différentes, sans se connaître. Ainsi, les fonctions pures (qui ne dépendent pas d'un état caché ou externe, qui n'ont pas d'effets secondaires observables et qui sont déterministes) constituent le code idéal pour la nature parallèle et distribuée des fonctions DoFn.

Toutefois, le modèle de fonction pure n'est pas strictement rigide. Les informations d'état ou les données d'initialisation externes peuvent être valides pour DoFn et d'autres objets de fonction, tant que votre code ne dépend pas d'éléments que le service Cloud Dataflow ne garantit pas. Lors de la structuration de vos transformations ParDoet de la création de vos fonctions DoFn, tenez compte des recommandations suivantes :

  • Le service Cloud Dataflow garantit que chaque élément de votre classe PCollection d'entrée est traité par une instance DoFn exactement une fois.
  • Le service Cloud Dataflow ne garantit pas le nombre de fois qu'une fonction DoFn sera invoquée.
  • Le service Cloud Dataflow ne garantit pas exactement la manière dont les éléments distribués sont regroupés : il ne garantit pas quels éléments sont traités ensemble (le cas échéant).
  • Le service Cloud Dataflow ne garantit pas le nombre exact d'instances DoFn qui seront créées au cours de l'exécution d'un pipeline.
  • Le service Cloud Dataflow est tolérant aux pannes et peut réessayer votre code plusieurs fois en cas de problèmes de nœud de calcul. Le service Cloud Dataflow peut créer des copies de sauvegarde de votre code et peut rencontrer des problèmes d'effets secondaires manuels (par exemple, si votre code s'appuie sur des fichiers temporaires avec des noms qui ne sont pas uniques ou s'il en crée).
  • Le service Cloud Dataflow sérialise le traitement des éléments par instance DoFn. Votre code n'a pas besoin d'être entièrement sécurisé. Cependant, tout état partagé entre plusieurs instances DoFn doit être sécurisé.

Consultez la section sur la configuration requise pour les fonctions fournies par l'utilisateur dans la documentation du modèle de programmation pour en savoir plus sur la création de votre code utilisateur.

Traitement des erreurs et des exceptions

Votre pipeline peut générer des exceptions lors du traitement des données. Certaines de ces erreurs sont transitoires (par exemple, difficulté temporaire à accéder à un service externe), mais certaines sont permanentes, telles que les erreurs causées par des données d'entrée corrompues ou impossibles à analyser, ou par des pointeurs vides lors du calcul.

Cloud Dataflow traite les éléments sous forme d'ensembles arbitraires et relance l'ensemble complet lorsqu'une erreur est générée pour l'un des éléments qu'il contient. Lors de l'exécution en mode de traitement par lots, les ensembles comprenant un élément défaillant sont relancés quatre fois. Le pipeline échoue complètement lorsqu'un ensemble échoue quatre fois. Lors de l'exécution en mode de traitement par flux, un ensemble comprenant un élément défaillant est relancé indéfiniment, ce qui risque de bloquer votre pipeline de manière permanente.

Optimisation de la fusion

Une fois que le formulaire JSON du graphique d'exécution de votre pipeline a été validé, le service Cloud Dataflow peut modifier le graphique pour effectuer des optimisations. Ces optimisations peuvent inclure la fusion de plusieurs étapes ou transformations du graphique d'exécution de votre pipeline en une seule étape. La fusion des étapes évite au service Cloud Dataflow de matérialiser chaque classe PCollection intermédiaire de votre pipeline, ce qui peut être coûteux en termes de surcharge de mémoire et de traitement.

Bien que toutes les transformations que vous avez spécifiées dans votre construction de pipeline soient exécutées sur le service, elles peuvent être exécutées dans un ordre différent ou dans le cadre d'une transformation fusionnée plus importante pour assurer l'exécution optimale de votre pipeline. Le service Cloud Dataflow respecte les dépendances de données entre les étapes du graphique d'exécution, mais les autres étapes peuvent être exécutées dans n'importe quel ordre.

Exemple de fusion

Le diagramme suivant montre comment le service Cloud Dataflow peut optimiser et fusionner le graphique d'exécution de l'exemple WordCount inclus avec le SDK Apache Beam pour Java pour une exécution efficace :

Le graphique d'exécution de l'exemple de programme WordCount est optimisé et comporte des étapes fusionnées par le service Cloud Dataflow.
Figure 2 : Exemple de graphique d'exécution optimisé pour WordCount

Empêcher la fusion

Dans votre pipeline, plusieurs situations peuvent vous amener à empêcher le service Cloud Dataflow d'effectuer des optimisations de fusion. Dans certains cas, le service Cloud Dataflow pourrait mal déterminer le moyen optimal pour fusionner des opérations dans le pipeline, ce qui pourrait limiter la capacité du service à utiliser tous les nœuds de calcul disponibles.

Par exemple, dans le cas d'une opération ParDo "à haute distribution ramifiée", la fusion peut limiter la capacité de Cloud Dataflow à optimiser l'utilisation des nœuds de calcul. Avec ce type d'opération, vous pouvez avoir une collection d'entrées avec relativement peu d'éléments, mais l'opération ParDo génère une sortie avec des éléments cent fois, voire mille fois plus nombreux, suivis d'une autre opération ParDo. Si le service Cloud Dataflow fusionne ces opérations ParDo, le parallélisme de cette étape est limité au maximum au nombre d'éléments de la collection d'entrée, même si la classe intermédiaire PCollection contient beaucoup plus d'éléments.

Vous pouvez empêcher une telle fusion en ajoutant à votre pipeline une opération qui oblige le service Cloud Dataflow à matérialiser votre classe PCollection intermédiaire. Envisagez d'utiliser l'une des opérations suivantes :

  • Vous pouvez insérer une opération GroupByKey et dissocier le groupe après votre première opération ParDo. Le service Cloud Dataflow ne fusionne jamais les opérations ParDo sur une agrégation.
  • Vous pouvez transmettre votre classe PCollection intermédiaire en tant qu'entrée secondaire à une autre opération ParDo. Le service Cloud Dataflow matérialise toujours les entrées secondaires.

Combiner l'optimisation

Les opérations d'agrégation forment un concept important dans le traitement de données à grande échelle. L'agrégation regroupe des données conceptuellement très éloignées, afin de les rendre extrêmement utiles pour la corrélation. Le modèle de programmation de Cloud Dataflow représente les opérations d'agrégation sous la forme des transformations GroupByKey, CoGroupByKey et Combine.

Les opérations d'agrégation de Cloud Dataflow combinent des données sur l'intégralité de l'ensemble de données, y compris des données pouvant être réparties sur plusieurs nœuds de calcul. Lors de telles opérations d'agrégation, il est souvent plus efficace de combiner le plus de données possible localement avant de combiner des données sur plusieurs instances. Lorsque vous appliquez une opération GroupByKey ou une autre transformation d'agrégation, le service Cloud Dataflow effectue automatiquement une combinaison locale partielle avant l'opération de regroupement principale.

Lors d'une combinaison partielle ou à plusieurs niveaux, le service Cloud Dataflow prend différentes décisions selon que votre pipeline utilise des données par lots ou en flux. Pour les données limitées, le service favorise l'efficacité et effectue autant que possible des combinaisons localement. Pour les données non limitées, le service favorise la faible latence et peut éviter d'effectuer des combinaisons partielles, qui risquent d'augmenter la latence.

Fonctions de réglage automatique

Le service Cloud Dataflow comprend plusieurs fonctionnalités de réglage automatique qui permettent d'optimiser encore plus dynamiquement votre tâche Cloud Dataflow pendant son exécution. Ces fonctions incluent l'autoscaling et le rééquilibrage dynamique des tâches.

Autoscaling

Lorsque l'autoscaling est activé, le service Cloud Dataflow choisit automatiquement le nombre approprié d'instances de nœuds de calcul requises pour exécuter votre tâche. Le service Cloud Dataflow peut également réaffecter de manière dynamique plus ou moins de nœuds de calcul pendant l'exécution, en fonction des caractéristiques de votre tâche. Certaines parties de votre pipeline peuvent demander davantage de temps de calcul que d'autres, et le service Cloud Dataflow peut générer automatiquement des tâches supplémentaires au cours de ces phases de votre tâche (et les arrêter lorsqu'elles ne sont plus nécessaires).

Java : SDK 2.x

L'autoscaling est activé par défaut sur toutes les tâches Cloud Dataflow par lots. Vous pouvez le désactiver en spécifiant explicitement l'option --autoscalingAlgorithm=NONE lorsque vous exécutez votre pipeline. Dans ce cas, notez que le service Cloud Dataflow définit le nombre de nœuds de calcul en fonction de l'option --numWorkers, dont la valeur par défaut est 3.

Si votre tâche Cloud Dataflow utilise une version antérieure du SDK, vous pouvez activer l'autoscaling en spécifiant l'option --autoscalingAlgorithm=THROUGHPUT_BASED lorsque vous exécutez votre pipeline.

Python

L'autoscaling est activé par défaut sur toutes les tâches Cloud Dataflow par lots créées à l'aide du SDK Apache Beam pour Python version 0.5.1 ou ultérieure. Vous pouvez le désactiver en spécifiant explicitement l'option --autoscaling_algorithm=NONE lorsque vous exécutez votre pipeline. Dans ce cas, notez que le service Cloud Dataflow définit le nombre de nœuds de calcul en fonction de l'option --num_workers, dont la valeur par défaut est 3.

Si votre tâche Cloud Dataflow utilise une version antérieure du SDK, vous pouvez activer l'autoscaling en spécifiant l'option --autoscaling_algorithm=THROUGHPUT_BASED lorsque vous exécutez votre pipeline.

Java : SDK 1.x

L'autoscaling est activé par défaut sur toutes les tâches Cloud Dataflow par lots créées à l'aide du SDK Cloud Dataflow pour Java version 1.6.0 ou ultérieure. Vous pouvez le désactiver en spécifiant explicitement l'option --autoscalingAlgorithm=NONE lorsque vous exécutez votre pipeline. Dans ce cas, notez que le service Cloud Dataflow définit le nombre de nœuds de calcul en fonction de l'option --numWorkers, dont la valeur par défaut est 3.

Si votre tâche Cloud Dataflow utilise une version antérieure du SDK, vous pouvez activer l'autoscaling en spécifiant l'option --autoscalingAlgorithm=THROUGHPUT_BASED lorsque vous exécutez votre pipeline.

Autoscaling par lots

Pour les données limitées en mode de traitement par lots, Cloud Dataflow choisit automatiquement le nombre de nœuds de calcul en fonction de la quantité de travail à chaque étape de votre pipeline et du débit actuel à cette étape.

Si votre pipeline utilise une source de données personnalisée que vous avez mise en œuvre, plusieurs méthodes vous permettent de fournir davantage d'informations à l'algorithme d'autoscaling du service Cloud Dataflow afin d'améliorer les performances :

Java : SDK 2.x

  • Dans votre sous-classe BoundedSource, mettez en œuvre la méthode getEstimatedSizeBytes. Le service Cloud Dataflow utilise getEstimatedSizeBytes lors du calcul du nombre initial de nœuds de calcul à utiliser pour votre pipeline.
  • Dans votre sous-classe BoundedReader, mettez en œuvre la méthode getFractionConsumed. Le service Cloud Dataflow utilise getFractionConsumed pour suivre la progression de la lecture et converger vers le nombre correct de nœuds de calcul à utiliser lors d'une lecture.

Python

  • Dans votre sous-classe BoundedSource, mettez en œuvre la méthode estimate_size. Le service Cloud Dataflow utilise estimate_size lors du calcul du nombre initial de nœuds de calcul à utiliser pour votre pipeline.
  • Dans votre sous-classe RangeTracker, mettez en œuvre la méthode fraction_consumed. Le service Cloud Dataflow utilise fraction_consumed pour suivre la progression de la lecture et converger vers le nombre correct de nœuds de calcul à utiliser lors d'une lecture.

Java : SDK 1.x

  • Dans votre sous-classe BoundedSource, mettez en œuvre la méthode getEstimatedSizeBytes. Le service Cloud Dataflow utilise getEstimatedSizeBytes lors du calcul du nombre initial de nœuds de calcul à utiliser pour votre pipeline.
  • Dans votre sous-classe BoundedReader, mettez en œuvre la méthode getFractionConsumed. Le service Cloud Dataflow utilise getFractionConsumed pour suivre la progression de la lecture et converger vers le nombre correct de nœuds de calcul à utiliser lors d'une lecture.

Autoscaling de flux

L'autoscaling de flux permet au service Cloud Dataflow de modifier de manière adaptative le nombre de nœuds de calcul utilisés pour exécuter votre pipeline de flux en réponse aux changements d'utilisation des ressources et de charge. L'autoscaling de flux est une fonctionnalité gratuite conçue pour réduire le coût des ressources utilisées lors de l'exécution de pipelines de flux.

Sans autoscaling, vous devez choisir un nombre fixe de nœuds de calcul en spécifiant numWorkers ou num_workers pour exécuter le pipeline. La charge de travail d'entrée varie dans le temps. Ce nombre peut donc devenir trop élevé ou trop faible. Le provisionnement d'un trop grand nombre de nœuds de calcul entraîne des coûts supplémentaires inutiles, tandis que le provisionnement d'un trop petit nombre de nœuds de calcul entraîne une latence plus élevée pour les données traitées. Avec l'activation de l'autoscaling, les ressources ne sont utilisées que lorsqu'elles sont nécessaires.

Pour prendre des décisions en matière de scaling, l'autoscaling s'appuie sur plusieurs signaux qui évaluent le niveau d'activité des nœuds de calcul et déterminent s'ils peuvent traiter le flux d'entrée. Les signaux clés incluent l'utilisation du processeur, le débit et le traitement en attente. L'objectif est de minimiser le traitement en attente tout en maximisant l'utilisation et le débit des nœuds de calcul, et de réagir rapidement aux pics de charge. En activant l'autoscaling, vous n'avez pas à choisir entre le provisionnement pour les pics de charge ou des résultats actualisés. Les nœuds de calcul sont ajoutés à mesure que l'utilisation du processeur et le traitement en attente augmentent, ou supprimés au fur et à mesure que ces métriques diminuent. Ainsi, vous ne payez que pour ce dont vous avez besoin, et la tâche est traitée aussi efficacement que possible.

Java : SDK 2.x

Sources illimitées personnalisées

Si votre pipeline utilise une source personnalisée sans limite, la source doit informer le service Cloud Dataflow du traitement en attente. Le traitement en attente est une estimation de l'entrée en octets qui n'a pas encore été traitée par la source. Pour informer le service du traitement en attente, mettez en œuvre l'une des deux méthodes suivantes dans votre classe UnboundedReader :

  • getSplitBacklogBytes() : traitement en attente pour la répartition actuelle de la source. Le service regroupe les traitements en attente dans toutes les répartitions.
  • getTotalBacklogBytes() : traitement en attente global sur toutes les répartitions. Dans certains cas, le traitement en attente n'est pas disponible pour chaque répartition et ne peut être calculé que sur toutes les répartitions. Seule la première répartition (ID de répartition "0") doit fournir le traitement en attente total.
Le dépôt Apache Beam contient plusieurs exemples de sources personnalisées qui mettent en œuvre la classe UnboundedReader.
Activer l'autoscaling de flux

Pour activer l'autoscaling, définissez les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=N

L'autoscaling peut osciller entre N/15 et N nœuds de calcul pendant l'exécution d'un pipeline, où N est la valeur de --maxNumWorkers. Par exemple, si votre pipeline a besoin de 3 ou 4 nœuds de calcul prêts à l'emploi, vous pouvez définir --maxNumWorkers=15 pour que le pipeline s'adapte automatiquement afin d'utiliser 1 à 15 nœuds de calcul.

Les pipelines de traitement par flux sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --maxNumWorkers. Tenez-en compte lorsque vous spécifiez --maxNumWorkers, et assurez-vous que cette valeur représente un nombre suffisant de disques pour votre pipeline.

Utilisation et tarifs

L'utilisation de Compute Engine est basée sur le nombre moyen de nœuds de calcul, tandis que l'utilisation des disques persistants est basée sur le nombre exact de --maxNumWorkers. Les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

Dans l'exemple ci-dessus, où --maxNumWorkers=15, vous payez entre 1 et 15 instances de Compute Engine et exactement 15 disques persistants.

Python

Activer l'autoscaling de flux

Pour activer l'autoscaling, définissez les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

L'autoscaling peut osciller entre N/15 et N nœuds de calcul pendant l'exécution d'un pipeline, où N est la valeur de max_num_workers. Par exemple, si votre pipeline a besoin de 3 ou 4 nœuds de calcul prêts à l'emploi, vous pouvez définir --max_num_workers=15 pour que le pipeline s'adapte automatiquement afin d'utiliser 1 à 15 nœuds de calcul.

Les pipelines de traitement par flux sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --max_num_workers. Tenez-en compte lorsque vous spécifiez --max_num_workers, et assurez-vous que cette valeur représente un nombre suffisant de disques pour votre pipeline.

Utilisation et tarifs

L'utilisation de Compute Engine est basée sur le nombre moyen de nœuds de calcul, tandis que l'utilisation des disques persistants est basée sur le nombre exact de --max_num_workers. Les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

Dans l'exemple ci-dessus, où --max_num_workers=15, vous payez entre 1 et 15 instances de Compute Engine et exactement 15 disques persistants.

Java : SDK 1.x

Si votre pipeline utilise une source personnalisée sans limite, il est essentiel que la source informe le service Cloud Dataflow du traitement en attente. Le traitement en attente est une estimation de l'entrée en octets qui n'a pas encore été traitée par la source. Pour informer le service du traitement en attente, mettez en œuvre l'une des deux méthodes suivantes dans votre classe UnboundedReader :

  • getSplitBacklogBytes() : traitement en attente pour la répartition actuelle de la source. Le service regroupe les traitements en attente dans toutes les répartitions.
  • getTotalBacklogBytes() : traitement en attente global sur toutes les répartitions. Dans certains cas, le traitement en attente n'est pas disponible pour chaque répartition et ne peut être calculé que sur toutes les répartitions. Seule la première répartition (ID de répartition "0") doit fournir le traitement en attente total.
Activer l'autoscaling de flux

Pour activer l'autoscaling, définissez les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=<N>

L'autoscaling peut osciller entre N/15 et N nœuds de calcul pendant l'exécution d'un pipeline, où N est la valeur de --maxNumWorkers. Par exemple, si votre pipeline a besoin de 3 ou 4 nœuds de calcul prêts à l'emploi, vous pouvez définir --maxNumWorkers=15 pour que le pipeline s'adapte automatiquement afin d'utiliser 1 à 15 nœuds de calcul.

Les pipelines de traitement par flux sont déployés avec un pool fixe de disques persistants, d'un nombre égal à --maxNumWorkers. Tenez-en compte lorsque vous spécifiez --maxNumWorkers, et assurez-vous que cette valeur représente un nombre suffisant de disques pour votre pipeline.

Actuellement, PubsubIO est la seule source acceptée par l'autoscaling sur les pipelines de flux. Tous les récepteurs fournis par le SDK sont compatibles. Dans cette version bêta, l'autoscaling fonctionne de manière optimale lors de la lecture d'abonnements Cloud Pub/Sub liés à des sujets publiés avec de petits lots et lors de l'écriture dans des récepteurs avec une faible latence. Dans les cas extrêmes (abonnements Cloud Pub/Sub avec des lots de publication importants ou des récepteurs avec une latence très élevée), l'autoscaling devient moins précis. Ce comportement sera amélioré dans les prochaines versions.

Utilisation et tarifs

L'utilisation de Compute Engine est basée sur le nombre moyen de nœuds de calcul, tandis que l'utilisation des disques persistants est basée sur le nombre exact de --maxNumWorkers. Les disques persistants sont redistribués de manière à ce que chaque nœud de calcul ait un nombre égal de disques associés.

Dans l'exemple ci-dessus, où --maxNumWorkers=15, vous payez entre 1 et 15 instances de Compute Engine et exactement 15 disques persistants.

Scaling manuel d'un pipeline de traitement par flux

En attendant que l'autoscaling soit disponible globalement en mode de traitement par flux, vous pouvez utiliser une solution de contournement pour faire évoluer manuellement le nombre de nœuds de calcul exécutant votre pipeline de flux à l'aide de la fonctionnalité de mise à jour de Cloud Dataflow.

Java : SDK 2.x

Pour redimensionner votre pipeline de flux lors de l'exécution, veillez à définir les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

  • Définissez --maxNumWorkers sur le nombre maximal de nœuds de calcul que vous souhaitez affecter à votre pipeline.
  • Définissez --numWorkers sur le nombre initial de nœuds de calcul que votre pipeline doit utiliser lorsqu'il commence à s'exécuter.

Une fois votre pipeline en cours d'exécution, vous pouvez le mettre à jour et spécifier un nouveau nombre de nœuds de calcul à l'aide du paramètre --numWorkers. La valeur que vous définissez pour le nouveau paramètre --numWorkers doit être comprise entre N et --maxNumWorkers, où N est égal à --maxNumWorkers / 15.

La fonction de mise à jour remplacera votre tâche en cours d'exécution par une nouvelle tâche, en utilisant le nouveau nombre de nœuds de calcul, tout en préservant toutes les informations d'état associées à la tâche précédente.

Python

Pour redimensionner votre pipeline de flux lors de l'exécution, veillez à définir les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

  • Définissez --max_num_workers sur le nombre maximal de nœuds de calcul que vous souhaitez affecter à votre pipeline.
  • Définissez --num_workers sur le nombre initial de nœuds de calcul que votre pipeline doit utiliser lorsqu'il commence à s'exécuter.

Une fois votre pipeline en cours d'exécution, vous pouvez le mettre à jour et spécifier un nouveau nombre de nœuds de calcul à l'aide du paramètre --num_workers. La valeur que vous définissez pour le nouveau paramètre --num_workers doit être comprise entre N et --max_num_workers, où N est égal à --max_num_workers / 15.

La fonction de mise à jour remplacera votre tâche en cours d'exécution par une nouvelle tâche, en utilisant le nouveau nombre de nœuds de calcul, tout en préservant toutes les informations d'état associées à la tâche précédente.

Java : SDK 1.x

Si vous savez que vous devrez redimensionner votre pipeline de flux lors de l'exécution, veillez à définir les paramètres d'exécution suivants lorsque vous démarrez votre pipeline :

  • Définissez --maxNumWorkers sur le nombre maximal de nœuds de calcul que vous souhaitez affecter à votre pipeline.
  • Définissez --numWorkers sur le nombre initial de nœuds de calcul que votre pipeline doit utiliser lorsqu'il commence à s'exécuter.

Une fois votre pipeline en cours d'exécution, vous pouvez le mettre à jour et spécifier un nouveau nombre de nœuds de calcul à l'aide du paramètre --numWorkers. La valeur que vous définissez pour le nouveau paramètre --numWorkers doit être comprise entre N et --maxNumWorkers, où N est égal à --maxNumWorkers / 15.

La fonction de mise à jour remplacera votre tâche en cours d'exécution par une nouvelle tâche, en utilisant le nouveau nombre de nœuds de calcul, tout en préservant toutes les informations d'état associées à la tâche précédente.

Rééquilibrage dynamique des tâches

La fonctionnalité de rééquilibrage dynamique du travail du service Cloud Dataflow permet au service de repartitionner de manière dynamique le travail en fonction des conditions d'exécution. Ces conditions peuvent inclure les éléments suivants :

  • Déséquilibres dans les attributions de tâches
  • Nœuds de calcul nécessitant plus de temps que prévu pour terminer
  • Nœuds de calcul nécessitant moins de temps que prévu pour terminer

Le service Cloud Dataflow détecte automatiquement ces conditions et peut réattribuer de manière dynamique le travail à des nœuds de calcul inutilisés ou sous-utilisés afin de réduire le temps de traitement global de votre tâche.

Limites

Le rééquilibrage dynamique du travail a lieu uniquement lorsque le service Cloud Dataflow traite certaines données d'entrée en parallèle : lors de la lecture de données à partir d'une source d'entrée externe, lors de l'utilisation d'une classe PCollection intermédiaire matérialisée, ou lors de l'utilisation du résultat d'une agrégation, comme GroupByKey. Si de nombreuses étapes de votre tâche sont fusionnées, votre tâche contient moins de classes PCollection intermédiaires, et le rééquilibrage dynamique du travail est limité au nombre d'éléments dans la classe PCollection matérialisée source. Si vous voulez vous assurer que le rééquilibrage dynamique du travail peut être appliqué à une classe PCollection particulière de votre pipeline, vous pouvez empêcher la fusion de différentes manières afin de garantir un parallélisme dynamique.

Le rééquilibrage dynamique du travail ne peut pas remettre en parallèle des données plus fines qu'un seul enregistrement. Si vos données contiennent des enregistrements individuels qui entraînent des temps de traitement importants, elles peuvent retarder votre tâche, car Cloud Dataflow ne peut pas subdiviser et redistribuer un enregistrement "à chaud" individuel sur plusieurs nœuds de calcul.

Java : SDK 2.x

Si vous avez défini un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de TextIO.Write.withNumShards), la parallélisation est limitée en fonction du nombre de segments que vous avez choisi.

Python

Si vous avez défini un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de beam.io.WriteToText(..., num_shards=...)), Cloud Dataflow limitera la parallélisation en fonction du nombre de segments que vous avez choisi.

Java : SDK 1.x

Si vous avez défini un nombre fixe de segments pour la sortie finale de votre pipeline (par exemple, en écrivant des données à l'aide de TextIO.Write.withNumShards), la parallélisation est limitée en fonction du nombre de segments que vous avez choisi.

La limitation des segments fixes peut être considérée comme temporaire et peut être modifiée dans les futures versions du service Cloud Dataflow.

Travailler avec des sources de données personnalisées

Java : SDK 2.x

Si votre pipeline utilise une source de données personnalisée que vous fournissez, vous devez mettre en œuvre la méthode splitAtFraction pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

Python

Si votre pipeline utilise une source de données personnalisée que vous fournissez, votre RangeTracker doit mettre en œuvre try_claim, try_split, position_at_fraction et fraction_consumed pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

Consultez les informations de référence de l'API sur RangeTracker pour en savoir plus.

Java : SDK 1.x

Si votre pipeline utilise une source de données personnalisée que vous fournissez, vous devez mettre en œuvre la méthode splitAtFraction pour permettre à votre source d'utiliser la fonctionnalité de rééquilibrage dynamique du travail.

Utiliser et gérer des ressources

Le service Cloud Dataflow gère entièrement les ressources dans GCP, tâche par tâche. Cela inclut le démarrage et l'arrêt d'instances Compute Engine (parfois appelées nœuds de calcul ou VM), ainsi que l'accès aux buckets Cloud Storage de votre projet pour la préparation de fichiers d'E/S et temporaires. Toutefois, si votre pipeline interagit avec des technologies de stockage de données GCP telles que BigQuery ou Cloud Pub/Sub, vous devez gérer les ressources et les quotas de ces services.

Cloud Dataflow utilise un emplacement fourni par l'utilisateur dans Cloud Storage spécifiquement pour la préparation des fichiers. Cet emplacement est sous votre contrôle, et vous devez vous assurer que sa durée de vie est maintenue aussi longtemps que vos tâches lisent les données qu'il contient. Vous pouvez réutiliser un même emplacement de préparation pour plusieurs exécutions de tâches, car la mise en cache intégrée du SDK peut accélérer le démarrage de vos tâches.

Tâches

Vous pouvez exécuter jusqu'à 25 tâches Cloud Dataflow simultanées par projet GCP.

Le service Cloud Dataflow est actuellement limité au traitement des requêtes de tâches d'une taille inférieure ou égale à 20 Mo. La taille de la requête de tâche est spécifiquement liée à la représentation JSON de votre pipeline. Plus le pipeline est grand, plus la requête est importante.

Pour estimer la taille de la requête JSON de votre pipeline, exécutez-le avec l'option suivante :

Java : SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

Java : SDK 1.x

--dataflowJobFile=< path to output file >

Cette commande écrit une représentation JSON de votre tâche dans un fichier. La taille du fichier sérialisé représente une estimation fiable de la taille de la requête. La taille réelle est toutefois légèrement plus importante en raison de la présence d'informations supplémentaires incluses dans la requête.

Pour plus d'informations, consultez la page de dépannage concernant l'erreur "413 Corps de requête trop long / La taille de la représentation JSON sérialisée du pipeline dépasse la limite autorisée".

De plus, la taille du graphique de votre tâche ne doit pas dépasser 10 Mo. Pour plus d'informations, consultez la page de dépannage concernant l'erreur "Le graphique de la tâche est trop grand. Veuillez réessayer avec un graphique de tâche plus petit, ou diviser votre tâche en deux plus petites tâches (ou plus)".

Nœuds de calcul

Le service Cloud Dataflow autorise actuellement un maximum de 1 000 instances Compute Engine par tâche. Le type de machine par défaut est n1-standard-1 pour les tâches par lots et n1-standard-4 pour les tâches de traitement par flux. En utilisant les types de machines par défaut, le service Cloud Dataflow peut donc allouer jusqu'à 4 000 cœurs par tâche.

Cloud Dataflow accepte les nœuds de calcul des séries n1 ainsi que les types de machines personnalisés. Vous pouvez spécifier un type de machine pour votre pipeline en définissant le paramètre d'exécution approprié au moment de la création du pipeline.

Java : SDK 2.x

Pour modifier le type de machine, définissez l'option --workerMachineType.

Python

Pour modifier le type de machine, définissez l'option --worker_machine_type.

Java : SDK 1.x

Pour modifier le type de machine, définissez l'option --workerMachineType.

Quota de ressources

Le service Cloud Dataflow vérifie que votre projet GCP dispose du quota de ressources Compute Engine requis pour exécuter votre tâche, à la fois pour démarrer la tâche et pour la faire évoluer vers le nombre maximal d'instances de nœuds de calcul. Votre tâche ne peut pas démarrer si le quota de ressources disponible est insuffisant.

La fonctionnalité d'autoscaling de Cloud Dataflow est limitée par le quota Compute Engine disponible de votre projet. Si votre tâche possède un quota suffisant au démarrage, mais qu'une autre tâche utilise le reste du quota disponible de votre projet, la première tâche est exécutée, mais ne peut pas être entièrement mise à l'échelle.

Cependant, le service Cloud Dataflow ne gère pas les augmentations de quota pour les tâches dépassant les quotas de ressources de votre projet. C'est à vous d'effectuer toutes les demandes de quotas de ressources supplémentaires nécessaires. Pour cela, vous pouvez utiliser la console Google Cloud Platform.

Ressources de disques persistants

Le service Cloud Dataflow est actuellement limité à 15 disques persistants par instance de nœud de calcul lors de l'exécution d'une tâche de flux. Chaque disque persistant est local sur une machine virtuelle Compute Engine individuelle. Votre tâche ne peut pas comporter plus de nœuds de calcul que de disques persistants. L'allocation minimale de ressources est un ratio de 1:1 entre les nœuds de calcul et les disques.

La taille par défaut de chaque disque persistant est de 250 Go en mode de traitement par lots et de 400 Go en mode de traitement par flux.

Zones

Par défaut, le service Cloud Dataflow déploie les ressources Compute Engine dans la zone us-central1-f de la région us-central1. Vous pouvez ignorer ce comportement en spécifiant le paramètre --region. Si vous devez utiliser une zone spécifique pour vos ressources, utilisez le paramètre --zone lors de la création de votre pipeline. Cependant, nous vous recommandons de ne spécifier que la région, mais pas la zone. Cela permet au service Cloud Dataflow de sélectionner automatiquement la meilleure zone de la région en fonction de la capacité disponible de la zone au moment de la demande de création de tâche. Pour en savoir plus, consultez la documentation relative aux points de terminaison régionaux.

Streaming Engine

Actuellement, l'exécuteur de pipeline Cloud Dataflow exécute entièrement les étapes de votre pipeline de flux sur des machines virtuelles de calcul et consomme les ressources de processeur, la mémoire et le stockage de disque persistant du nœud de calcul. La fonctionnalité Streaming Engine de Cloud Dataflow transfère l'exécution du pipeline depuis les machines virtuelles de calcul vers le backend du service Cloud Dataflow.

Avantages de Streaming Engine

Le modèle Streaming Engine présente les avantages suivants :

  • Réduction des ressources de processeur, de mémoire et de disques persistants consommées sur les machines virtuelles de calcul. Streaming Engine fonctionne mieux avec des types de machines plus petits (n1-standard-2 plutôt que n1-standard-4) et ne nécessite pas de disque persistant en dehors d'un petit disque de démarrage pour le nœud de calcul, ce qui réduit la consommation de ressources et de quotas.
  • Autoscaling plus réactif en réponse aux variations du volume de données entrantes. Streaming Engine offre un scaling plus fluide et plus précis des nœuds de calcul.
  • Meilleure compatibilité : vous n'avez pas besoin de redéployer vos pipelines pour appliquer les mises à jour du service.

La majeure partie de la réduction des ressources de calcul est due au déchargement du travail sur le service Cloud Dataflow. Pour cette raison, des frais sont appliqués à l'utilisation de Streaming Engine. Toutefois, le montant total facturé pour des pipelines Cloud Dataflow utilisant Streaming Engine devrait rester sensiblement identique au coût des pipelines Cloud Dataflow qui n'utilisent pas cette option.

Utiliser Streaming Engine

Streaming Engine est actuellement disponible pour les pipelines de traitement par flux dans les régions ci-dessous. D'autres régions seront ajoutées ultérieurement.

  • us-central1 (Iowa)
  • us-west1 (Oregon)
  • europe-west1 (Belgique)
  • europe-west4 (Pays-Bas)
  • asia-east1 (Taïwan)
  • asia-northeast1 (Tokyo)

Java : SDK 2.x

Pour utiliser Streaming Engine pour vos pipelines de traitement par flux, spécifiez le paramètre suivant :

  • --enableStreamingEngine si vous utilisez le SDK Apache Beam pour Java en version 2.11.0 ou supérieure.
  • --experiments=enable_streaming_engine si vous utilisez le SDK Apache Beam pour Java en version 2.10.0.

Si vous utilisez Cloud Dataflow Streaming Engine pour votre pipeline, ne spécifiez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Streaming Engine est actuellement disponible. Cloud Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Cloud Dataflow signale une erreur.

Streaming Engine fonctionne mieux avec des types de machines plus petits pour les nœuds de calcul. Nous vous recommandons donc de définir --workerMachineType=n1-standard-2. Vous pouvez également définir --diskSizeGb=30, car Streaming Engine n'a besoin d'espace que pour l'image de démarrage du nœud de calcul et les journaux locaux. Ces valeurs constituent les valeurs par défaut.

Python

Pour utiliser Streaming Engine pour vos pipelines de traitement par flux, spécifiez le paramètre suivant :

--enable_streaming_engine

Si vous utilisez Cloud Dataflow Streaming Engine pour votre pipeline, ne spécifiez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Streaming Engine est actuellement disponible. Cloud Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Cloud Dataflow signale une erreur.

Streaming Engine fonctionne mieux avec des types de machines plus petits pour les nœuds de calcul. Nous vous recommandons donc de définir --machine_type=n1-standard-2. Vous pouvez également définir --disk_size_gb=30, car Streaming Engine n'a besoin d'espace que pour l'image de démarrage du nœud de calcul et les journaux locaux. Ces valeurs constituent les valeurs par défaut.

Java : SDK 1.x

Streaming Engine n'est pas compatible avec le SDK Cloud Dataflow pour Java version 1.x. Pour utiliser cette fonctionnalité, vous devez utiliser le SDK Apache Beam pour Java 2.8.0 ou version ultérieure.

Cloud Dataflow Shuffle

Cloud Dataflow Shuffle est l'opération de base derrière les transformations Cloud Dataflow telles que GroupByKey, CoGroupByKey et Combine. L'opération Cloud Dataflow Shuffle permet de partitionner et de regrouper les données par clé de manière évolutive et efficace tout en bénéficiant d'une tolérance aux pannes. Actuellement, Cloud Dataflow utilise une mise en œuvre aléatoire qui s'exécute entièrement sur les machines virtuelles de calcul et consomme des ressources de processeur, de la mémoire et du stockage de disque persistant. La fonctionnalité Cloud Dataflow Shuffle basée sur les services, disponible uniquement pour les pipelines par lots, transfère l'opération de lecture aléatoire depuis les machines virtuelles de calcul vers le backend du service Cloud Dataflow.

Avantages de Cloud Dataflow Shuffle

La fonctionnalité Cloud Dataflow Shuffle basée sur les services présente les avantages suivants :

  • Temps d'exécution plus rapide des pipelines par lots pour la majorité des types de tâches de pipeline.
  • Réduction des ressources de processeur, de mémoire et de disques persistants consommées sur les machines virtuelles de calcul.
  • Meilleur autoscaling : les machines virtuelles ne contiennent plus de données aléatoires et leur nombre peut donc être réduit plus tôt.
  • Meilleure tolérance aux pannes : une machine virtuelle défectueuse contenant des données Cloud Dataflow Shuffle n'entraîne pas l'échec de l'ensemble de la tâche, comme cela se produirait si la fonctionnalité n'était pas utilisée.

La majeure partie de la réduction des ressources de calcul est due au déchargement de la tâche en mode aléatoire sur le service Cloud Dataflow. Pour cette raison, des frais sont associés à l'utilisation de Cloud Dataflow Shuffle. Toutefois, le montant total facturé pour des pipelines Cloud Dataflow utilisant la mise en œuvre Cloud Dataflow basée sur les services devrait être inférieur ou égal au coût des pipelines Cloud Dataflow qui n'utilisent pas cette option.

Pour la majorité des types de tâches de traitement par flux, Cloud Dataflow Shuffle doit s'exécuter plus rapidement que l'implémentation en mode aléatoire qui s'exécute sur les machines virtuelles de calcul. Cependant, les temps d'exécution peuvent varier d'une exécution à l'autre. Si vous exécutez un pipeline avec des délais importants, nous vous recommandons d'allouer suffisamment de temps de mémoire tampon avant l'échéance. En outre, envisagez de demander un quota plus important pour Shuffle.

Considérations relatives aux disques

Lorsque vous utilisez la fonctionnalité Cloud Dataflow Shuffle basée sur les services, vous n'avez pas besoin de joindre des disques persistants importants à vos machines virtuelles de calcul. Cloud Dataflow associe automatiquement un petit disque de démarrage de 25 Go. Cependant, en raison de la petite taille de ce disque, il convient de tenir compte de facteurs importants lors de l'utilisation de Cloud Dataflow Shuffle :

  • Une machine virtuelle de calcul utilise une partie des 25 Go d'espace disque pour le système d'exploitation, les fichiers binaires, les journaux et les conteneurs. Les tâches qui utilisent une quantité importante de disque et qui dépassent la capacité de disque restante peuvent échouer lorsque vous utilisez Cloud Dataflow Shuffle.
  • Les tâches qui utilisent beaucoup d'E/S de disque peuvent être lentes en raison des performances du petit disque. Pour plus d'informations sur les différences de performances entre les différentes tailles de disque, consultez la page Performances des disques persistants Compute Engine.

Si l'un de ces facteurs s'applique à votre tâche, vous pouvez utiliser les options de pipeline pour spécifier une taille de disque plus importante.

Utilisation de Cloud Dataflow Shuffle

La fonctionnalité Cloud Dataflow Shuffle basée sur les services est actuellement disponible dans les régions suivantes :

  • us-central1 (Iowa)
  • us-west1 (Oregon)
  • europe-west1 (Belgique)
  • europe-west4 (Pays-Bas)
  • asia-east1 (Taïwan)
  • asia-northeast1 (Tokyo)

D'autres régions seront ajoutées ultérieurement.

Java : SDK 2.x

Pour utiliser la fonctionnalité Cloud Dataflow Shuffle basée sur les services dans vos pipelines de traitement par lot, spécifiez le paramètre suivant :
--experiments=shuffle_mode=service

Si vous utilisez Cloud Dataflow Shuffle pour votre pipeline, ne renseignez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Shuffle est actuellement disponible. Cloud Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Cloud Dataflow signale une erreur.

Python

Pour utiliser la fonctionnalité Cloud Dataflow Shuffle basée sur les services dans vos pipelines de traitement par lot, spécifiez le paramètre suivant :
--experiments=shuffle_mode=service

Si vous utilisez Cloud Dataflow Shuffle pour votre pipeline, ne renseignez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Shuffle est actuellement disponible. Cloud Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Cloud Dataflow signale une erreur.

Java : SDK 1.x

Pour utiliser la fonctionnalité Cloud Dataflow Shuffle basée sur les services dans vos pipelines de traitement par lot, spécifiez le paramètre suivant :
--experiments=shuffle_mode=service

Si vous utilisez Cloud Dataflow Shuffle pour votre pipeline, ne renseignez pas le paramètre --zone. Au lieu de cela, spécifiez le paramètre --region et définissez sa valeur sur l'une des régions dans lesquelles Shuffle est actuellement disponible. Cloud Dataflow sélectionne automatiquement la zone dans la région spécifiée. Si vous spécifiez le paramètre --zone en le définissant sur une zone en dehors des régions disponibles, Cloud Dataflow signale une erreur.

Planification flexible des ressources dans Cloud Dataflow

Cloud Dataflow FlexRS (planification flexible des ressources) réduit le coût des traitements par lots en utilisant des techniques de planification avancées, le service Cloud Dataflow Shuffle et une combinaison d'instances de VM préemptives et de VM standards. Grâce à cette exécution combinée de VM préemptives et de VM standards en parallèle, Cloud Dataflow améliore l'expérience utilisateur si Compute Engine est amené à arrêter des instances de VM préemptives lors d'un événement système. FlexRS contribue à garantir que le pipeline continue de progresser et que vous ne perdez pas le travail déjà réalisé lorsque Compute Engine préempte vos VM préemptives. Pour plus d'informations sur FlexRS, consultez la page Utiliser la planification flexible des ressources dans Cloud Dataflow.
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.