Réduction du temps de pause de la récupération de place dans un programme Haskell

Nous développons un programme qui reçoit et transmet des “messages”, tout en gardant un historique temporaire de ces messages, afin qu’il puisse vous indiquer l’historique des messages si nécessaire. Les messages sont identifiés numériquement, ont généralement une taille d’environ 1 kilo-octet, et nous devons conserver des centaines de milliers de ces messages.

Nous souhaitons optimiser ce programme pour la latence: le délai entre l’envoi et la réception d’un message doit être inférieur à 10 millisecondes.

Le programme est écrit en Haskell et compilé avec GHC. Cependant, nous avons constaté que les pauses de récupération de mémoire sont beaucoup trop longues pour nos besoins de latence: plus de 100 millisecondes dans notre programme réel.

Le programme suivant est une version simplifiée de notre application. Il utilise un Data.Map.Ssortingct pour stocker les messages. Les messages sont des ByteSsortingng identifiées par un Int . 1 000 000 de messages sont insérés par ordre numérique croissant et les messages les plus anciens sont continuellement supprimés pour conserver l’historique à un maximum de 200 000 messages.

 module Main (main) where import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteSsortingng as ByteSsortingng import qualified Data.Map.Ssortingct as Map data Msg = Msg !Int !ByteSsortingng.ByteSsortingng type Chan = Map.Map Int ByteSsortingng.ByteSsortingng message :: Int -> Msg message n = Msg n (ByteSsortingng.replicate 1024 (fromIntegral n)) pushMsg :: Chan -> Msg -> IO Chan pushMsg chan (Msg msgId msgContent) = Exception.evaluate $ let inserted = Map.insert msgId msgContent chan in if 200000 < Map.size inserted then Map.deleteMin inserted else inserted main :: IO () main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) 

Nous avons compilé et exécuté ce programme en utilisant:

 $ ghc --version The Glorious Glasgow Haskell Compilation System, version 7.10.3 $ ghc -O2 -optc-O3 Main.hs $ ./Main +RTS -s 3,116,460,096 bytes allocated in the heap 385,101,600 bytes copied during GC 235,234,800 bytes maximum residency (14 sample(s)) 124,137,808 bytes maximum slop 600 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s INIT time 0.000s ( 0.000s elapsed) MUT time 0.652s ( 0.745s elapsed) GC time 0.417s ( 0.530s elapsed) EXIT time 0.010s ( 0.052s elapsed) Total time 1.079s ( 1.326s elapsed) %GC time 38.6% (40.0% elapsed) Alloc rate 4,780,213,353 bytes per MUT second Productivity 61.4% of total user, 49.9% of total elapsed 

La mésortingque importante ici est la “pause maximale” de 0,0515s, soit 51 millisecondes. Nous souhaitons réduire cela d’au moins un ordre de grandeur.

L’expérimentation montre que la longueur d’une pause GC est déterminée par le nombre de messages de l’historique. La relation est à peu près linéaire ou peut-être super-linéaire. Le tableau suivant montre cette relation. ( Vous pouvez voir nos tests de benchmarking ici , et quelques graphiques ici .)

 msgs history length max GC pause (ms) =================== ================= 12500 3 25000 6 50000 13 100000 30 200000 56 400000 104 800000 199 1600000 487 3200000 1957 6400000 5378 

Nous avons expérimenté plusieurs autres variables pour déterminer si elles peuvent réduire cette latence, mais aucune ne fait une grande différence. Parmi ces variables sans importance, citons: optimisation ( -O , -O2 ); Options RTS GC ( -G , -H , -A , -c ), nombre de cœurs ( -N ), différentes structures de données ( Data.Sequence ), la taille des messages et la quantité d’ordures de courte durée générées. Le facteur déterminant est le nombre de messages dans l’histoire.

Notre théorie de travail est que les pauses sont linéaires dans le nombre de messages, car chaque cycle de GC doit parcourir toute la mémoire accessible et la copier, qui sont clairement des opérations linéaires.

Des questions:

  • Cette théorie du temps linéaire est-elle correcte? La durée des pauses GC peut-elle être exprimée de cette manière simple, ou la réalité est-elle plus complexe?
  • Si la pause GC est linéaire dans la mémoire de travail, existe-t-il un moyen de réduire les facteurs constants impliqués?
  • Existe-t-il des options pour GC incrémental, ou quelque chose comme ça? Nous ne pouvons voir que des documents de recherche. Nous sums très disposés à négocier le débit pour une latence inférieure.
  • Existe-t-il des moyens de “partitionner” la mémoire pour des cycles GC plus petits, autres que la division en plusieurs processus?

Vous vous débrouillez plutôt bien pour avoir un temps de pause de 51 ms avec plus de 200 Mo de données en direct. Le système sur lequel je travaille a un temps de pause maximum plus grand avec la moitié de cette quantité de données en direct.

Votre hypothèse est correcte, le temps de pause majeur du CPG est directement proportionnel à la quantité de données en direct, et malheureusement, il n’ya aucun moyen de contourner le problème avec GHC actuel. Nous avons expérimenté avec le GC incrémental dans le passé, mais c’était un projet de recherche et n’a pas atteint le niveau de maturité nécessaire pour le plier dans le GHC publié.

Nous espérons que cela aidera à l’avenir dans les régions compactes: https://phabricator.haskell.org/D1264 . C’est une sorte de gestion manuelle de la mémoire où vous compactez une structure dans le tas, et le GC n’a pas à le parcourir. Cela fonctionne mieux pour les données à long terme, mais peut-être sera-t-il suffisant d’utiliser des messages individuels dans votre environnement. Nous visons à l’avoir dans 8.2 GHC.

Si vous êtes dans une configuration dissortingbuée et que vous avez un équilibreur de charge, vous pouvez jouer à certaines astuces pour éviter de prendre la pause, vous vous assurez que l’équilibreur de charge n’envoie pas de requêtes aux machines sur le sharepoint faire un GC majeur, et bien sûr s’assurer que la machine termine toujours le GC même s’il ne reçoit pas de requêtes.

J’ai essayé votre extrait de code avec une approche IOVector utilisant IOVector comme structure de données sous-jacente. Sur mon système (GHC 7.10.3, mêmes options de compilation), cela a entraîné une réduction du temps maximum (la mesure que vous avez mentionnée dans votre OP) de ~ 22%.

NB J’ai fait deux hypothèses ici:

  1. Une structure de données mutable convient parfaitement au problème (je suppose que le passage de messages implique de toute façon IO)
  2. Vos messagesId sont continus

Avec certains parameters Int et arithmétiques supplémentaires (comme lorsque les messagesId sont réinitialisés à 0 ou minBound ), il devrait être simple de déterminer si un certain message est encore dans l’historique et de le récupérer dans l’index correspondant dans le tampon de sonnerie.

Pour votre plaisir de test:

 import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteSsortingng as ByteSsortingng import qualified Data.Map.Ssortingct as Map import qualified Data.Vector.Mutable as Vector data Msg = Msg !Int !ByteSsortingng.ByteSsortingng type Chan = Map.Map Int ByteSsortingng.ByteSsortingng data Chan2 = Chan2 { next :: !Int , maxId :: !Int , ringBuffer :: !(Vector.IOVector ByteSsortingng.ByteSsortingng) } chanSize :: Int chanSize = 200000 message :: Int -> Msg message n = Msg n (ByteSsortingng.replicate 1024 (fromIntegral n)) newChan2 :: IO Chan2 newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize pushMsg2 :: Chan2 -> Msg -> IO Chan2 pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) = let ix' = if ix == chanSize then 0 else ix + 1 in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store) pushMsg :: Chan -> Msg -> IO Chan pushMsg chan (Msg msgId msgContent) = Exception.evaluate $ let inserted = Map.insert msgId msgContent chan in if chanSize < Map.size inserted then Map.deleteMin inserted else inserted main, main1, main2 :: IO () main = main2 main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000]) 

Je suis d’accord avec les autres – si vous avez des contraintes en temps réel difficiles, utiliser un langage GC n’est pas idéal.

Toutefois, vous pouvez envisager d’expérimenter avec d’autres structures de données disponibles plutôt que de simplement utiliser Data.Map.

Je l’ai réécrit en utilisant Data.Sequence et j’ai obtenu des améliorations prometteuses:

 msgs history length max GC pause (ms) =================== ================= 12500 0.7 25000 1.4 50000 2.8 100000 5.4 200000 10.9 400000 21.8 800000 46 1600000 87 3200000 175 6400000 350 

Même si vous optimisez la latence, j’ai remarqué que d’autres mesures s’amélioraient également. Dans le cas 200000, le temps d’exécution passe de 1,5 à 0,2 et l’utilisation totale de la mémoire passe de 600 Mo à 27 Mo.

Je dois noter que j’ai sortingché en modifiant le design:

  • J’ai retiré l’ Int du Msg , donc ce n’est pas à deux endroits.
  • Au lieu d’utiliser une carte de Int s à ByteSsortingng s, j’ai utilisé une Sequence de ByteSsortingng s, et au lieu d’un Int par message, je pense que cela peut être fait avec un Int pour toute la Sequence . En supposant que les messages ne peuvent pas être réorganisés, vous pouvez utiliser un seul décalage pour traduire le message que vous souhaitez placer dans la queue.

(J’ai inclus une fonction supplémentaire getMsg pour le démontrer.)

 {-# LANGUAGE BangPatterns #-} import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteSsortingng as ByteSsortingng import Data.Sequence as S newtype Msg = Msg ByteSsortingng.ByteSsortingng data Chan = Chan Int (Seq ByteSsortingng.ByteSsortingng) message :: Int -> Msg message n = Msg (ByteSsortingng.replicate 1024 (fromIntegral n)) maxSize :: Int maxSize = 200000 pushMsg :: Chan -> Msg -> IO Chan pushMsg (Chan !offset sq) (Msg msgContent) = Exception.evaluate $ let newSize = 1 + S.length sq newSq = sq |> msgContent in if newSize <= maxSize then Chan offset newSq else case S.viewl newSq of (_ :< newSq') -> Chan (offset+1) newSq' S.EmptyL -> error "Can't happen" getMsg :: Chan -> Int -> Maybe Msg getMsg (Chan offset sq) i_ = getMsg' (i_ - offset) where getMsg' i | i < 0 = Nothing | i >= S.length sq = Nothing | otherwise = Just (Msg (S.index sq i)) main :: IO () main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize]) 

Eh bien, vous avez trouvé la limitation des langages avec GC: ils ne conviennent pas aux systèmes temps réel hardcore.

Vous avez 2 options:

1st Augmentez la taille du segment de mémoire et utilisez un système de mise en cache à 2 niveaux. Les messages les plus anciens sont envoyés sur disque et vous conservez les derniers messages en mémoire. Pour ce faire, utilisez la pagination du système d’exploitation. Le problème, cependant, avec cette solution est que la pagination peut être coûteuse en fonction des capacités de lecture de l’unité de mémoire secondaire utilisée.

2ème programme cette solution en utilisant “C” et l’interface avec FFI à haskell. De cette façon, vous pouvez faire votre propre gestion de la mémoire. Ce serait la meilleure option car vous pouvez contrôler vous-même la mémoire dont vous avez besoin.