"Exactement une fois" dans Dataflow

Dataflow prend en charge le traitement de type "exactement une fois" pour les enregistrements. Cette page explique comment Dataflow met en œuvre le traitement de type "exactement une fois" tout en garantissant une faible latence.

Présentation

Les pipelines de traitement par lot utilisent toujours le traitement "exactement une fois". Par défaut, les pipelines de traitement de flux utilisent le traitement "exactement une fois", mais ils peuvent également utiliser le traitement "au moins une fois".

Le traitement de type "exactement une fois" fournit des garanties sur les résultats du traitement des enregistrements, y compris sur les résultats de chaque étape du pipeline. Plus précisément, pour chaque enregistrement qui arrive au pipeline à partir d'une source, ou qui arrive à une étape de pipeline à partir d'une étape précédente, Dataflow garantit les éléments suivants :

  • L'enregistrement est traité et n'est pas perdu.
  • Tous les résultats de traitement qui restent dans le pipeline sont reflétés au maximum une fois.

En d'autres termes, les enregistrements sont traités au moins une fois et les résultats sont validés exactement une fois.

Le traitement "exactement une fois" garantit l'exactitude des résultats, sans enregistrement en double dans la sortie. Dataflow est optimisé pour minimiser la latence tout en conservant une sémantique de type "exactement une fois". Toutefois, le traitement de type "exactement une fois" entraîne toujours des coûts de déduplication. Pour les cas d'utilisation pouvant tolérer des enregistrements en double, vous pouvez souvent réduire les coûts et améliorer la latence en activant le mode "au moins une fois". Pour vous aider à choisir entre le traitement de flux "exactement une fois" et le traitement "au moins une fois", consultez la section Définir le mode de traitement de flux pour le pipeline.

Données tardives

Le traitement de type "exactement une fois" garantit la précision du pipeline : si le pipeline traite un enregistrement, Dataflow s'assure que l'enregistrement est reflété dans la sortie et que l'enregistrement n'est pas dupliqué.

Cependant, dans un pipeline de traitement de flux, le traitement de type "exactement une fois" ne peut pas garantir que les résultats sont complets, car les enregistrements peuvent arriver en retard. Par exemple, supposons que votre pipeline effectue une agrégation sur une fenêtre temporelle, telle que Count. Avec le traitement de type "exactement une fois", le résultat est exact pour les enregistrements qui arrivent dans la fenêtre en temps opportun, mais les enregistrements tardifs peuvent être supprimés.

En règle générale, il n'existe aucun moyen de garantir l'exhaustivité d'un pipeline de traitement de flux, car en théorie, les enregistrements peuvent arriver en retard de façon arbitraire. Dans le cas limitant, il faudrait parfois attendre indéfiniment pour produire un résultat. Plus précisément, Apache Beam vous permet de configurer le seuil de suppression des données tardives et de définir à quel moment émettre les résultats agrégés. Pour en savoir plus, consultez la section Filigranes et données en retard dans la documentation Apache Beam.

Effets secondaires

Il n'est pas garanti que les effets secondaires aient une sémantique de type "exactement une fois". Il est important de noter que cela inclut l'écriture des résultats dans un magasin externe, sauf si le récepteur met également en œuvre la sémantique de type "exactement une fois".

Plus précisément, Dataflow ne garantit pas que chaque enregistrement subit chaque transformation exactement une fois. En raison de nouvelles tentatives ou d'échecs des nœuds de calcul, Dataflow peut appliquer une transformation plusieurs fois sur un même enregistrement, voire même simultanément sur plusieurs nœuds de calcul.

Dans le cadre du traitement de type "exactement une fois", Dataflow déduplique les résultats. Toutefois, si le code d'une transformation a des effets secondaires, ceux-ci peuvent se produire plusieurs fois. Par exemple, si une transformation effectue un appel à un service distant, cet appel peut être effectué plusieurs fois pour le même enregistrement. Les effets secondaires peuvent même entraîner une perte de données dans certaines situations. Par exemple, supposons qu'une transformation lit un fichier pour produire un résultat, puis supprime immédiatement le fichier sans attendre le commit du résultat. Si une erreur se produit lors du commit du résultat, Dataflow relance la transformation, mais celle-ci ne peut plus lire le fichier supprimé.

Journalisation

La sortie du journal de traitement indique que le traitement s'est produit, mais n'indique pas si les données ont été validées. Par conséquent, les fichiers journaux peuvent indiquer que les données ont été traitées plusieurs fois, même si les résultats des données traitées ne sont stockés qu'une seule fois dans l'espace de stockage persistant. En outre, les journaux ne reflètent pas toujours les données traitées et validées. Les journaux peuvent être supprimés en raison d'un ralentissement ou d'une perte résultant d'autres problèmes du service de journalisation.

Traitement de flux "exactement une fois"

Cette section explique comment Dataflow met en œuvre le traitement de type "exactement une fois" pour les tâches de traitement par flux, y compris la manière dont Dataflow gère des complexités telles que le traitement non déterministe, les données tardives et le code personnalisé.

Brassage de flux Dataflow

Les jobs Dataflow en streaming s'exécutent en parallèle sur plusieurs nœuds de calcul en attribuant des plages de travail à chaque nœud. Bien que les attributions puissent changer au fil du temps en réponse à des échecs de nœuds de calcul, à l'autoscaling ou à d'autres événements, après chaque transformation GroupByKey, tous les enregistrements ayant la même clé sont traités sur le même nœud de calcul. La transformation GroupByKey est souvent utilisée par des transformations composites, telles que Count, FileIO, etc. Pour s'assurer que les enregistrements d'une clé donnée se retrouvent sur le même nœud de calcul, les nœuds de calcul Dataflow mélangent les données entre eux à l'aide d'appels de procédure à distance (RPC).

Pour garantir que les enregistrements ne sont pas perdus lors du mélange, Dataflow utilise une sauvegarde en amont. Avec la sauvegarde en amont, le nœud de calcul qui envoie les enregistrements relance les requêtes RPC jusqu'à ce qu'il reçoive un accusé de réception positif indiquant que l'enregistrement a été reçu. Les effets secondaires du traitement de l'enregistrement sont appliqués à l'espace de stockage persistant en aval. Si le nœud de calcul qui envoie les enregistrements devient indisponible, Dataflow continue à effectuer des tentatives de RPC, ce qui garantit que chaque enregistrement est distribué au moins une fois.

Étant donné que ces nouvelles tentatives peuvent créer des doublons, chaque message est associé à un ID unique. Chaque récepteur stocke un catalogue de tous les ID déjà vus et traités. Lorsqu'un enregistrement est reçu, Dataflow recherche son ID dans le catalogue. Si cet ID est trouvé, l'enregistrement a déjà été reçu et validé, l'enregistrement est donc supprimé en tant que doublon. Pour garantir la stabilité des ID d'enregistrement, chaque sortie de chaque étape est pointée vers l'espace de stockage. Par conséquent, si le même message est envoyé plusieurs fois en raison d'appels RPC répétés, le message n'est validé pour le stockage qu'une seule fois.

Garantir une faible latence

Pour que le traitement de type "exactement une fois" soit viable, les E/S doivent être réduits, en particulier en empêchant les E/S sur chaque enregistrement. Pour ce faire, Dataflow utilise des filtres de Bloom et la récupération de mémoire.

Filtres de Bloom

Les filtres de Bloom sont des structures de données compactes qui permettent de vérifier rapidement l'appartenance à un ensemble. Dans Dataflow, chaque nœud de calcul conserve un filtre de Bloom de chaque ID qu'il voit. Lorsqu'un nouvel ID d'enregistrement arrive, le nœud de calcul recherche l'ID dans le filtre. Si le filtre renvoie la valeur "false", l'enregistrement n'est pas un doublon et le nœud de calcul ne recherche pas l'ID dans le stockage stable.

Dataflow conserve un ensemble de filtres de Bloom glissants répartis par période. Lorsqu'un enregistrement arrive, Dataflow sélectionne le filtre approprié à vérifier en fonction de l'horodatage du système. Cette étape empêche les filtres de Bloom de saturer à mesure que les filtres sont collectés, et limite également la quantité de données à scanner au démarrage.

Récupération de mémoire

Pour éviter de remplir l'espace de stockage avec des ID d'enregistrement, Dataflow utilise la récupération de mémoire pour supprimer les anciens enregistrements. Dataflow utilise l'horodatage système pour calculer un filigrane de récupération de mémoire.

Ce filigrane est basé sur le temps physique passé à attendre une étape donnée. Il fournit donc également des informations sur les parties lentes du pipeline. Ces métadonnées constituent la base de la métrique de décalage système affichée dans l'interface de surveillance de Dataflow.

Si un enregistrement arrive avec un horodatage antérieur au filigrane et si des ID ont déjà été récupérés de la mémoire, l'enregistrement est ignoré. Comme le faible filigrane qui déclenche la récupération de mémoire n'avance pas tant que les livraisons d'enregistrements n'ont pas été confirmées, ces enregistrements tardifs sont en double.

Sources non déterministes

Dataflow utilise le SDK Apache Beam pour lire les données dans les pipelines. Si le traitement échoue, Dataflow peut réessayer les lectures à partir d'une source. Dans ce cas, Dataflow doit s'assurer que chaque enregistrement unique produit par une source est enregistré exactement une fois. Pour les sources déterministes, telles que Pub/Sub Lite ou Kafka, les enregistrements sont lus en fonction d'un décalage enregistré, ce qui élimine le besoin de cette étape.

Étant donné que Dataflow ne peut pas attribuer automatiquement des ID d'enregistrement, les sources non déterministes doivent indiquer à Dataflow les ID d'enregistrement afin d'éviter les doublons. Lorsqu'une source fournit des ID uniques pour chaque enregistrement, le connecteur utilise un mélange dans le pipeline pour supprimer les doublons. Les enregistrements ayant le même ID sont filtrés. Pour obtenir un exemple de mise en œuvre du traitement de type "exactement une fois" dans Dataflow en utilisant Pub/Sub comme source, consultez la section Traitement exactement une fois de la page "Streaming avec Pub/Sub".

Lorsque vous exécutez des DoFn personnalisés dans votre pipeline, Dataflow ne garantit pas que ce code ne sera exécuté qu'une seule fois par enregistrement. Pour garantir au moins une opération de traitement en cas de défaillance des nœuds de calcul, Dataflow peut exécuter plusieurs fois un enregistrement donné via une transformation ou exécuter le même enregistrement simultanément sur plusieurs nœuds de calcul. Si vous incluez du code dans votre pipeline qui permet, par exemple, de contacter un service externe, les actions peuvent être exécutées plusieurs fois pour un enregistrement donné.

Pour rendre le traitement non déterministe effectivement déterministe, utilisez le point de contrôle. Lorsque vous utilisez le point de contrôle, chaque sortie d'une transformation est enregistrée dans un espace de stockage stable avec son ID unique avant d'être envoyée à l'étape suivante. Les nouvelles tentatives de distribution de Dataflow renvoient le résultat qui a fait l'objet d'un point de contrôle. Bien que votre code puisse s'exécuter plusieurs fois, Dataflow garantit que la sortie d'une seule de ces exécutions est stockée. Dataflow utilise un magasin cohérent qui empêche l'écriture de doublons dans un espace de stockage stable.

Diffusion des résultats "exactement une fois"

Le SDK Apache Beam inclut des récepteurs intégrés conçus pour éviter la production de doublons. Dans la mesure du possible, utilisez l'un de ces récepteurs intégrés.

Si vous devez écrire votre propre récepteur, la meilleure approche consiste à rendre votre objet fonction idempotent afin qu'il puisse faire l'objet de nouvelles tentatives aussi souvent que nécessaire sans provoquer d'effets secondaires inattendus. Toutefois, il arrive souvent qu'un composant de la transformation qui implémente la fonctionnalité de récepteur ne soit pas déterministe et change en cas de nouvelle tentative.

Par exemple, dans une agrégation par fenêtre, l'ensemble d'enregistrements dans la fenêtre peut être non déterministe. Plus précisément, la fenêtre peut tenter de se déclencher avec les éléments e0, e1 et e2. Le nœud de calcul peut planter avant d'enregistrer le traitement de la fenêtre, mais pas avant que ces éléments soient envoyés en tant qu'effet secondaire. Lorsque le nœud de calcul redémarre, la fenêtre se déclenche à nouveau et un élément e3 tardif arrive. Étant donné que cet élément arrive avant le commit de la fenêtre, il n'est pas comptabilisé en tant que donnée tardive. Par conséquent, DoFn est appelé à nouveau avec les éléments e0, e1, e2, e3. Ces éléments sont ensuite envoyés à l'opération d'effet secondaire. L'idempotence n'est pas utile dans ce scénario, car différents ensembles d'enregistrements logiques sont envoyés à chaque fois.

Pour résoudre le problème de non-déterminisme dans Dataflow, utilisez la transformation Reshuffle intégrée. Lorsque Dataflow brasse des données, il les écrit de manière durable de sorte que tous les éléments non déterministes sont stables si les opérations sont relancées après le brassage. L'utilisation de la transformation Reshuffle garantit qu'une seule version de sortie d'une DoFn peut passer au-delà d'une limite de brassage. Le modèle suivant garantit que l'opération d'effet secondaire reçoit toujours un enregistrement déterministe à la sortie :

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Pour vous assurer que l'exécuteur Dataflow sait que les éléments doivent être stables avant d'exécuter une opération DoFn, ajoutez l'annotation RequiresStableInput au DoFn.

En savoir plus