Créer une queue bloquante dans .NET?

J’ai un scénario où plusieurs threads s’ajoutent à une queue et plusieurs threads lisent dans la même queue. Si la queue atteint une taille spécifique, tous les threads qui remplissent la queue seront bloqués sur add jusqu’à ce qu’un élément soit supprimé de la queue.

La solution ci-dessous est ce que j’utilise actuellement et ma question est la suivante: comment cela peut-il être amélioré? Existe-t-il un object qui active déjà ce comportement dans la BCL que je devrais utiliser?

internal class BlockingCollection : CollectionBase, IEnumerable { //todo: might be worth changing this into a proper QUEUE private AutoResetEvent _FullEvent = new AutoResetEvent(false); internal T this[int i] { get { return (T) List[i]; } } private int _MaxSize; internal int MaxSize { get { return _MaxSize; } set { _MaxSize = value; checkSize(); } } internal BlockingCollection(int maxSize) { MaxSize = maxSize; } internal void Add(T item) { Trace.WriteLine(ssortingng.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.WaitOne(); List.Add(item); Trace.WriteLine(ssortingng.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId)); checkSize(); } internal void Remove(T item) { lock (List) { List.Remove(item); } Trace.WriteLine(ssortingng.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId)); } protected override void OnRemoveComplete(int index, object value) { checkSize(); base.OnRemoveComplete(index, value); } internal new IEnumerator GetEnumerator() { return List.GetEnumerator(); } private void checkSize() { if (Count < MaxSize) { Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Set(); } else { Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Reset(); } } } 

Cela semble très dangereux (très peu de synchronisation); que diriez-vous de quelque chose comme:

 class SizeQueue { private readonly Queue queue = new Queue(); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } } 

(modifier)

En réalité, vous voudriez un moyen de fermer la queue pour que les lecteurs commencent à sortir proprement – peut-être quelque chose comme un drapeau booléen – si elle est définie, une queue vide revient (plutôt que bloquante):

 bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } } 

Utilisez .net 4 BlockingCollection, pour mettre en queue, utilisez Add (), pour dé-mettre en queue, utilisez Take (). Il utilise en interne ConcurrentQueue non bloquant. Plus d’infos ici Technique de queue Producteur / consommateur rapide et meilleur BlocageCollection vs File d’attente simultanée

“Comment ça pourrait être amélioré?”

Eh bien, vous devez examiner chaque méthode de votre classe et examiner ce qui se passerait si un autre thread appelait simultanément cette méthode ou toute autre méthode. Par exemple, vous placez un verrou dans la méthode Remove, mais pas dans la méthode Add. Que se passe-t-il si un thread ajoute en même temps qu’un autre thread? Supprime? Mauvaises choses.

Considérez également qu’une méthode peut renvoyer un second object permettant d’accéder aux données internes du premier object, par exemple, GetEnumerator. Imaginez qu’un thread traverse cet énumérateur, un autre thread modifie la liste en même temps. Pas bon.

En règle générale, simplifiez la tâche en réduisant au minimum le nombre de méthodes de la classe.

En particulier, n’héritez pas d’une autre classe de conteneur, car vous exposerez toutes les méthodes de cette classe, ce qui permettra à l’appelant de corrompre les données internes ou d’afficher partiellement les modifications apscopes aux données (tout comme les données). apparaît corrompu à ce moment). Cachez tous les détails et soyez impitoyable sur la façon dont vous leur permettez l’access.

Je vous conseille vivement d’utiliser des solutions standard – obtenez un livre sur le threading ou utilisez une bibliothèque tierce. Sinon, compte tenu de ce que vous tentez, vous allez déboguer votre code pendant longtemps.

En outre, cela n’aurait-il pas plus de sens de supprimer un élément (par exemple, celui qui a été ajouté en premier, car il s’agit d’une queue), plutôt que l’appelant choisissant un élément spécifique? Et lorsque la queue est vide, Remove peut également bloquer.

Mise à jour: la réponse de Marc met en œuvre toutes ces suggestions! 🙂 Mais je vais laisser cela ici car il peut être utile de comprendre pourquoi sa version est une telle amélioration.

Vous pouvez utiliser BlockingCollection et ConcurrentQueue dans l’espace de noms System.Collections.Concurrent

  public class ProducerConsumerQueue : BlockingCollection { ///  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality ///  public ProducerConsumerQueue() : base(new ConcurrentQueue()) { } ///  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality ///  ///  public ProducerConsumerQueue(int maxSize) : base(new ConcurrentQueue(), maxSize) { } } 

Je viens de renverser cela en utilisant les extensions réactives et je me suis souvenu de cette question:

 public class BlockingQueue { private readonly Subject _queue; private readonly IEnumerator _enumerator; private readonly object _sync = new object(); public BlockingQueue() { _queue = new Subject(); _enumerator = _queue.GetEnumerator(); } public void Enqueue(T item) { lock (_sync) { _queue.OnNext(item); } } public T Dequeue() { _enumerator.MoveNext(); return _enumerator.Current; } } 

Pas nécessairement tout à fait sûr, mais très simple.

C’est ce que je suis venu opter pour une queue de blocage bornée.

 using System; using System.Collections.Generic; using System.Text; using System.Threading; public class BlockingBuffer { private Object t_lock; private Semaphore sema_NotEmpty; private Semaphore sema_NotFull; private T[] buf; private int getFromIndex; private int putToIndex; private int size; private int numItems; public BlockingBuffer(int Capacity) { if (Capacity <= 0) throw new ArgumentOutOfRangeException("Capacity must be larger than 0"); t_lock = new Object(); buf = new T[Capacity]; sema_NotEmpty = new Semaphore(0, Capacity); sema_NotFull = new Semaphore(Capacity, Capacity); getFromIndex = 0; putToIndex = 0; size = Capacity; numItems = 0; } public void put(T item) { sema_NotFull.WaitOne(); lock (t_lock) { while (numItems == size) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } buf[putToIndex++] = item; if (putToIndex == size) putToIndex = 0; numItems++; Monitor.Pulse(t_lock); } sema_NotEmpty.Release(); } public T take() { T item; sema_NotEmpty.WaitOne(); lock (t_lock) { while (numItems == 0) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } item = buf[getFromIndex++]; if (getFromIndex == size) getFromIndex = 0; numItems--; Monitor.Pulse(t_lock); } sema_NotFull.Release(); return item; } } 

Je n’ai pas complètement exploré le TPL, mais il pourrait y avoir quelque chose qui correspond à vos besoins, ou à tout le moins, un certain nombre de sources de Reflector pour trouver de l’inspiration.

J’espère que cela pourra aider.

Eh bien, vous pourriez regarder la classe System.Threading.Semaphore . A part ça – non, vous devez le faire vous-même. AFAIK il n’y a pas une telle collection intégrée.

Si vous voulez un débit maximum, permettant à plusieurs lecteurs de lire et un seul graveur à écrire, BCL a quelque chose appelé ReaderWriterLockSlim qui devrait aider à affiner votre code …