Pourquoi Monitor.PulseAll génère-t-il un modèle de latence «d’escalier» dans les threads signalés?

Dans une bibliothèque utilisant Monitor.PulseAll () pour la synchronisation des threads, j’ai remarqué que la latence entre le moment où PulseAll (…) est appelé et le moment où un thread est réveillé semble suivre une dissortingbution “stepping stair” – avec grandes marches. Les fils réveillés ne font presque pas de travail; et presque immédiatement à attendre sur le moniteur. Par exemple, sur une boîte de 12 cœurs avec 24 threads en attente sur un moniteur (2x Xeon5680 / Gulftown; 6 cœurs physiques par processeur; HT désactivé), la latence entre le réveil et le thread est telle que:

Latence à l'aide de Monitor.PulseAll (); Bibliothèque tierce

Les 12 premiers threads (notez que nous avons 12 cœurs) prennent entre 30 et 60 microsecondes pour répondre. Ensuite, nous commençons à obtenir de très gros sauts; avec les plateaux autour de 700, 1300, 1900 et 2600 microsecondes.

J’ai réussi à recréer ce comportement indépendamment de la bibliothèque tierce en utilisant le code ci-dessous. Ce que fait ce code est de lancer un grand nombre de threads (changer le paramètre numThreads) qui attend juste sur un moniteur, lit un horodatage, le connecte à un ConcurrentSet, puis retourne immédiatement à Waiting. Une fois par seconde, PulseAll () réveille tous les threads. Il le fait 20 fois et rapporte les latences pour la 10ème itération à la console.

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace PulseAllTest { class Program { static long LastTimestamp; static long Iteration; static object SyncObj = new object(); static Stopwatch s = new Stopwatch(); static ConcurrentBag<Tuple> IterationToTicks = new ConcurrentBag<Tuple>(); static void Main(ssortingng[] args) { long numThreads = 32; for (int i = 0; i < numThreads; ++i) { Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning); } s.Start(); for (int i = 0; i < 20; ++i) { lock (SyncObj) { ++Iteration; LastTimestamp = s.Elapsed.Ticks; Monitor.PulseAll(SyncObj); } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(String.Join("\n", from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond)); Console.Read(); } static void ReadLastTimestampAndPublish() { while(true) { lock(SyncObj) { Monitor.Wait(SyncObj); } IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); } } } } 

En utilisant le code ci-dessus, voici un exemple de latences sur une boîte avec 8 cœurs / w hyperthreading activé (soit 16 cœurs dans le Gestionnaire des tâches) et 32 ​​threads (* 2x Xeon5550 / Gainestown; 4 cœurs physiques par processeur; HT activé):

Latence à l'aide de Monitor.PulseAll (), exemple de code

EDIT: Pour essayer de prendre NUMA hors de l’équation, voici un graphique exécutant le programme exemple avec 16 threads sur un Core i7-3770 (Ivy Bridge); 4 kernelx physiques; HT activé:

Latence à l'aide de Monitor.PulseAll (), exemple de code, pas de NUMA

Quelqu’un peut-il expliquer pourquoi Monitor.PulseAll () se comporte de cette façon?

EDIT2:

Pour essayer de montrer que ce comportement n’est pas inhérent à l’éveil simultané d’un tas de threads, j’ai répliqué le comportement du programme de test en utilisant Events; et au lieu de mesurer la latence de PulseAll (), je mesure la latence de ManualResetEvent.Set (). Le code crée un certain nombre de threads de travail, puis attend un événement ManualResetEvent.Set () sur le même object ManualResetEvent. Lorsque l’événement est déclenché, ils prennent une mesure de latence puis attendent immédiatement leur propre AutoResetEvent par thread. Bien avant la prochaine itération (500 ms avant), ManualResetEvent est réinitialisé () et chaque AutoResetEvent est défini sur Set () pour que les threads puissent attendre à nouveau sur le ManualResetEvent partagé.

J’ai hésité à poster ceci parce que cela pourrait être une audition rouge géante (je ne prétends pas que les événements et les moniteurs se comportent de la même manière) et qu’il utilise des pratiques absolument terribles pour qu’un événement se comporte comme un moniteur (j’aimerais les collègues feraient si je soumettais ceci à une revue de code); mais je pense que les résultats sont éclairants.

Ce test a été effectué sur la même machine que le test d’origine. un 2xXeon5680 / Gulftown; 6 cœurs par processeur (12 cœurs au total); Hyperthreading désactivé.

ManualResetEventLatency

Si ce n’est pas évident, c’est radicalement différent de Monitor.PulseAll; voici le premier graphique superposé sur le dernier graphique:

ManualResetEventLatency vs. Latence du moniteur

Le code utilisé pour générer ces mesures est ci-dessous:

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace MRETest { class Program { static long LastTimestamp; static long Iteration; static ManualResetEventSlim MRES = new ManualResetEventSlim(false); static List Publishers = new List(); static Stopwatch s = new Stopwatch(); static ConcurrentBag<Tuple> IterationToTicks = new ConcurrentBag<Tuple>(); static void Main(ssortingng[] args) { long numThreads = 24; s.Start(); for (int i = 0; i < numThreads; ++i) { AutoResetEvent ares = new AutoResetEvent(false); ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish( new AutoResetEvent(false)); Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning); Publishers.Add(spinner); } for (int i = 0; i < 20; ++i) { ++Iteration; LastTimestamp = s.Elapsed.Ticks; MRES.Set(); Thread.Sleep(500); MRES.Reset(); foreach (ReadLastTimestampAndPublish publisher in Publishers) { publisher.ARES.Set(); } Thread.Sleep(500); } Console.WriteLine(String.Join("\n", from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond)); Console.Read(); } class ReadLastTimestampAndPublish { public AutoResetEvent ARES { get; private set; } public ReadLastTimestampAndPublish(AutoResetEvent ares) { this.ARES = ares; } public void Spin() { while (true) { MRES.Wait(); IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); ARES.WaitOne(); } } } } } 

Une différence entre ces versions est que dans le cas PulseAll – les threads répètent immédiatement la boucle, verrouillant à nouveau l’object.

Vous avez 12 cœurs, donc 12 threads sont en cours d’exécution, exécutez la boucle et entrez à nouveau dans la boucle, en verrouillant l’object (l’un après l’autre), puis en entrant l’état d’attente. Pendant tout ce temps, les autres threads attendent. Dans le cas de ManualEvent, vous avez deux événements, donc les threads ne répètent pas immédiatement la boucle, mais sont bloqués sur les événements ARES, ce qui permet aux autres threads de prendre plus rapidement possession du verrou.

J’ai simulé un comportement similaire dans PulseAll en ajoutant le sumil à la fin de la boucle dans ReadLastTimestampAndPublish. Cela permet à d’autres threads de verrouiller syncObj plus rapidement et semble améliorer les chiffres que je reçois du programme.

 static void ReadLastTimestampAndPublish() { while(true) { lock(SyncObj) { Monitor.Wait(SyncObj); } IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); Thread.Sleep(TimeSpan.FromMilliseconds(100)); // <=== } } 

Pour commencer, il ne s’agit pas d’une réponse, mais simplement de mes notes concernant le SSCLI pour savoir exactement ce qui se passe. La plupart de ces choses sont bien au-dessus de ma tête, mais néanmoins intéressantes.

La descente dans le Monitor.PulseAll commence par un appel à Monitor.PulseAll , implémenté en C #:

clr\src\bcl\system\threading\monitor.cs :

 namespace System.Threading { public static class Monitor { // other methods omitted [MethodImplAtsortingbute(MethodImplOptions.InternalCall)] private static extern void ObjPulseAll(Object obj); public static void PulseAll(Object obj) { if (obj==null) { throw new ArgumentNullException("obj"); } ObjPulseAll(obj); } } } 

Les méthodes InternalCall sont routées dans clr\src\vm\ecall.cpp :

 FCFuncStart(gMonitorFuncs) FCFuncElement("Enter", JIT_MonEnter) FCFuncElement("Exit", JIT_MonExit) FCFuncElement("TryEnterTimeout", JIT_MonTryEnter) FCFuncElement("ObjWait", ObjectNative::WaitTimeout) FCFuncElement("ObjPulse", ObjectNative::Pulse) FCFuncElement("ObjPulseAll", ObjectNative::PulseAll) FCFuncElement("ReliableEnter", JIT_MonReliableEnter) FCFuncEnd() 

ObjectNative vit dans clr\src\vm\comobject.cpp :

 FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE) { CONTRACTL { MODE_COOPERATIVE; DISABLED(GC_TRIGGERS); // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F. THROWS; SO_TOLERANT; } CONTRACTL_END; OBJECTREF pThis = (OBJECTREF) pThisUNSAFE; HELPER_METHOD_FRAME_BEGIN_1(pThis); //-[autocvtpro]------------------------------------------------------- if (pThis == NULL) COMPlusThrow(kNullReferenceException, L"NullReference_This"); pThis->PulseAll(); //-[autocvtepi]------------------------------------------------------- HELPER_METHOD_FRAME_END(); } FCIMPLEND 

OBJECTREF est un peu de magie saupoudré sur Object (l’opérateur -> est surchargé), alors OBJECTREF->PulseAll() est en fait Object->PulseAll() qui est implémenté dans clr\src\vm\object.h et transmet simplement le appel à ObjHeader->PulseAll :

 class Object { // snip public: // snip ObjHeader *GetHeader() { LEAF_CONTRACT; return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader)); } // snip void PulseAll() { WRAPPER_CONTRACT; GetHeader()->PulseAll(); } // snip } 

ObjHeader::PulseAll récupère le SyncBlock , qui utilise AwareLock pour Enter et Exit le verrou sur l’object. AwareLock ( clr\src\vm\syncblk.cpp ) utilise un CLREvent ( clr\src\vm\synch.cpp ) créé en tant que MonitorEvent ( CLREvent::CreateMonitorEvent(SIZE_T) ), qui appelle UnsafeCreateEvent ( clr\src\inc\unsafe.h ) ou les méthodes de synchronisation de l’environnement d’hébergement.

clr\src\vm\syncblk.cpp :

 void ObjHeader::PulseAll() { CONTRACTL { INSTANCE_CHECK; THROWS; GC_TRIGGERS; MODE_ANY; INJECT_FAULT(COMPlusThrowOM();); } CONTRACTL_END; // The following code may cause GC, so we must fetch the sync block from // the object now in case it moves. SyncBlock *pSB = GetBaseObject()->GetSyncBlock(); // GetSyncBlock throws on failure _ASSERTE(pSB != NULL); // make sure we own the crst if (!pSB->DoesCurrentThreadOwnMonitor()) COMPlusThrow(kSynchronizationLockException); pSB->PulseAll(); } void SyncBlock::PulseAll() { CONTRACTL { INSTANCE_CHECK; NOTHROW; GC_NOTRIGGER; MODE_ANY; } CONTRACTL_END; WaitEventLink *pWaitEventLink; while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL) pWaitEventLink->m_EventWait->Set(); } 

DequeueThread utilise un crst ( clr\src\vm\crst.cpp ) qui entoure les sections critiques. m_EventWait est un CLREvent manuel.

Donc, tout cela utilise des primitives du système d’exploitation à moins que le fournisseur d’hébergement par défaut ne remplace les choses.