File d’attente de taille fixe qui extrait automatiquement les anciennes valeurs sur les nouvelles enques

J’utilise ConcurrentQueue pour une structure de données partagée dont le but est de conserver les derniers N objects (type d’histoire).

Supposons que nous ayons un navigateur et que nous voulons avoir les 100 dernières URL parcourues. Je veux une queue qui supprime automatiquement la queue la plus ancienne lors de l’insertion d’une nouvelle entrée lorsque la capacité est pleine (100 adresses dans l’historique).

Comment puis-je accomplir cela en utilisant System.Collections ?

J’écrirais une classe de wrapper qui, sur Enqueue, vérifierait le Count puis Dequeue lorsque le nombre dépasse la limite.

  public class FixedSizedQueue { ConcurrentQueue q = new ConcurrentQueue(); private object lockObject = new object(); public int Limit { get; set; } public void Enqueue(T obj) { q.Enqueue(obj); lock (lockObject) { T overflow; while (q.Count > Limit && q.TryDequeue(out overflow)) ; } } } 

J’irais pour une légère variante … étendre ConcurrentQueue pour pouvoir utiliser les extensions Linq sur FixedSizeQueue

 public class FixedSizedQueue : ConcurrentQueue { private readonly object syncObject = new object(); public int Size { get; private set; } public FixedSizedQueue(int size) { Size = size; } public new void Enqueue(T obj) { base.Enqueue(obj); lock (syncObject) { while (base.Count > Size) { T outObj; base.TryDequeue(out outObj); } } } } 

Pour ceux qui le trouvent utile, voici un code de travail basé sur la réponse de Richard Schneider ci-dessus:

 public class FixedSizedQueue { readonly ConcurrentQueue queue = new ConcurrentQueue(); public int Size { get; private set; } public FixedSizedQueue(int size) { Size = size; } public void Enqueue(T obj) { queue.Enqueue(obj); while (queue.Count > Size) { T outObj; queue.TryDequeue(out outObj); } } } 

Pour ce que cela vaut, voici un tampon circulaire léger avec certaines méthodes marquées pour une utilisation sûre et dangereuse.

 public class CircularBuffer : IEnumerable { readonly int size; readonly object locker; int count; int head; int rear; T[] values; public CircularBuffer(int max) { this.size = max; locker = new object(); count = 0; head = 0; rear = 0; values = new T[size]; } static int Incr(int index, int size) { return (index + 1) % size; } private void UnsafeEnsureQueueNotEmpty() { if (count == 0) throw new Exception("Empty queue"); } public int Size { get { return size; } } public object SyncRoot { get { return locker; } } #region Count public int Count { get { return UnsafeCount; } } public int SafeCount { get { lock (locker) { return UnsafeCount; } } } public int UnsafeCount { get { return count; } } #endregion #region Enqueue public void Enqueue(T obj) { UnsafeEnqueue(obj); } public void SafeEnqueue(T obj) { lock (locker) { UnsafeEnqueue(obj); } } public void UnsafeEnqueue(T obj) { values[rear] = obj; if (Count == Size) head = Incr(head, Size); rear = Incr(rear, Size); count = Math.Min(count + 1, Size); } #endregion #region Dequeue public T Dequeue() { return UnsafeDequeue(); } public T SafeDequeue() { lock (locker) { return UnsafeDequeue(); } } public T UnsafeDequeue() { UnsafeEnsureQueueNotEmpty(); T res = values[head]; values[head] = default(T); head = Incr(head, Size); count--; return res; } #endregion #region Peek public T Peek() { return UnsafePeek(); } public T SafePeek() { lock (locker) { return UnsafePeek(); } } public T UnsafePeek() { UnsafeEnsureQueueNotEmpty(); return values[head]; } #endregion #region GetEnumerator public IEnumerator GetEnumerator() { return UnsafeGetEnumerator(); } public IEnumerator SafeGetEnumerator() { lock (locker) { List res = new List(count); var enumerator = UnsafeGetEnumerator(); while (enumerator.MoveNext()) res.Add(enumerator.Current); return res.GetEnumerator(); } } public IEnumerator UnsafeGetEnumerator() { int index = head; for (int i = 0; i < count; i++) { yield return values[index]; index = Incr(index, size); } } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return this.GetEnumerator(); } #endregion } 

J'aime utiliser la convention Foo()/SafeFoo()/UnsafeFoo() :

  • Foo méthodes de Foo appellent UnsafeFoo par défaut.
  • UnsafeFoo méthodes UnsafeFoo modifient librement l'état sans verrou, elles ne doivent appeler que d'autres méthodes dangereuses.
  • SafeFoo méthodes SafeFoo appellent les méthodes UnsafeFoo dans un verrou.

C'est un peu verbeux, mais il fait des erreurs évidentes, comme appeler des méthodes dangereuses en dehors d'un verrou dans une méthode qui est supposée être thread-safe, plus apparente.

Juste pour le plaisir, voici une autre mise en œuvre qui, à mon avis, répond à la plupart des préoccupations des commentateurs. En particulier, la sécurité des threads est obtenue sans locking et l’implémentation est masquée par la classe d’emballage.

 public class FixedSizeQueue : IReadOnlyCollection { private ConcurrentQueue _queue = new ConcurrentQueue(); private int _count; public int Limit { get; private set; } public FixedSizeQueue(int limit) { this.Limit = limit; } public void Enqueue(T obj) { _queue.Enqueue(obj); Interlocked.Increment(ref _count); // Calculate the number of items to be removed by this thread in a thread safe manner int currentCount; int finalCount; do { currentCount = _count; finalCount = Math.Min(currentCount, this.Limit); } while (currentCount != Interlocked.CompareExchange(ref _count, finalCount, currentCount)); T overflow; while (currentCount > finalCount && _queue.TryDequeue(out overflow)) currentCount--; } public int Count { get { return _count; } } public IEnumerator GetEnumerator() { return _queue.GetEnumerator(); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return _queue.GetEnumerator(); } } 

Ma version est juste une sous-classe de Queue normales. Rien de spécial, mais voir tout le monde participer et ça va toujours avec le titre du sujet Je pourrais tout aussi bien le mettre ici. Il retourne également les désenregistrés au cas où.

 public sealed class SizedQueue : Queue { public int FixedCapacity { get; } public SizedQueue(int fixedCapacity) { this.FixedCapacity = fixedCapacity; } ///  /// If the total number of item exceed the capacity, the oldest ones automatically dequeues. ///  /// The dequeued value, if any. public new T Enqueue(T item) { base.Enqueue(item); if (base.Count > FixedCapacity) { return base.Dequeue(); } return default; } } 

Pour votre plaisir de codage je vous soumets le ‘ ConcurrentDeck

 public class ConcurrentDeck { private readonly int _size; private readonly T[] _buffer; private int _position = 0; public ConcurrentDeck(int size) { _size = size; _buffer = new T[size]; } public void Push(T item) { lock (this) { _buffer[_position] = item; _position++; if (_position == _size) _position = 0; } } public T[] ReadDeck() { lock (this) { return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray(); } } } 

Exemple d’utilisation:

 void Main() { var deck = new ConcurrentDeck>(25); var handle = new ManualResetEventSlim(); var task1 = Task.Factory.StartNew(()=>{ var timer = new System.Timers.Timer(); timer.Elapsed += (s,a) => {deck.Push(new Tuple("task1",DateTime.Now));}; timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds; timer.Enabled = true; handle.Wait(); }); var task2 = Task.Factory.StartNew(()=>{ var timer = new System.Timers.Timer(); timer.Elapsed += (s,a) => {deck.Push(new Tuple("task2",DateTime.Now));}; timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds; timer.Enabled = true; handle.Wait(); }); var task3 = Task.Factory.StartNew(()=>{ var timer = new System.Timers.Timer(); timer.Elapsed += (s,a) => {deck.Push(new Tuple("task3",DateTime.Now));}; timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds; timer.Enabled = true; handle.Wait(); }); System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10)); handle.Set(); var outputtime = DateTime.Now; deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true); } 

Ajoutons une autre réponse. Pourquoi cela sur les autres?

1) la simplicité. Essayer de garantir la taille est bien et conduit à une complexité inutile qui peut présenter ses propres problèmes.

2) Implémente IReadOnlyCollection, ce qui signifie que vous pouvez utiliser Linq dessus et le transmettre à une variété de choses qui attendent IEnumerable.

3) Pas de locking. Bon nombre des solutions ci-dessus utilisent des verrous, ce qui est incorrect sur une collection sans locking.

4) Implémente le même ensemble de méthodes, propriétés et interfaces que ConcurrentQueue, y compris IProducerConsumerCollection, qui est important si vous souhaitez utiliser la collection avec BlockingCollection.

Cette implémentation pourrait éventuellement aboutir à plus d’entrées que prévu si TryDequeue échoue, mais la fréquence de cet incident ne semble pas être un code spécialisé qui gênera inévitablement les performances et provoquera ses propres problèmes inattendus.

Si vous voulez absolument garantir une taille, l’implémentation d’une méthode Prune () ou similaire semble être la meilleure idée. Vous pouvez utiliser un verrou de lecture ReaderWriterLockSlim dans les autres méthodes (y compris TryDequeue) et effectuer un locking en écriture uniquement lors de l’élagage.

 class ConcurrentFixedSizeQueue : IProducerConsumerCollection, IReadOnlyCollection, ICollection { readonly ConcurrentQueue m_concurrentQueue; readonly int m_maxSize; public int Count => m_concurrentQueue.Count; public bool IsEmpty => m_concurrentQueue.IsEmpty; public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty(), maxSize) { } public ConcurrentFixedSizeQueue (IEnumerable initialCollection, int maxSize) { if (initialCollection == null) { throw new ArgumentNullException(nameof(initialCollection)); } m_concurrentQueue = new ConcurrentQueue(initialCollection); m_maxSize = maxSize; } public void Enqueue (T item) { m_concurrentQueue.Enqueue(item); if (m_concurrentQueue.Count > m_maxSize) { T result; m_concurrentQueue.TryDequeue(out result); } } public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result); public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result); public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index); public T[] ToArray () => m_concurrentQueue.ToArray(); public IEnumerator GetEnumerator () => m_concurrentQueue.GetEnumerator(); IEnumerator IEnumerable.GetEnumerator () => GetEnumerator(); // Explicit ICollection implementations. void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index); object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot; bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized; // Explicit IProducerConsumerCollection implementations. bool IProducerConsumerCollection.TryAdd (T item) => ((IProducerConsumerCollection) m_concurrentQueue).TryAdd(item); bool IProducerConsumerCollection.TryTake (out T item) => ((IProducerConsumerCollection) m_concurrentQueue).TryTake(out item); public override int GetHashCode () => m_concurrentQueue.GetHashCode(); public override bool Equals (object obj) => m_concurrentQueue.Equals(obj); public override ssortingng ToSsortingng () => m_concurrentQueue.ToSsortingng(); } 

Eh bien, cela dépend de l’utilisation J’ai remarqué que certaines des solutions ci-dessus peuvent dépasser la taille lorsqu’elles sont utilisées dans un environnement multi-thread. Quoi qu’il en soit, mon cas d’utilisation consistait à afficher les 5 derniers événements et plusieurs threads écrivent des événements dans la queue et un autre à en lire et à l’afficher dans un contrôle Winform. C’était donc ma solution.

EDIT: Comme nous utilisons déjà le locking dans notre implémentation, nous n’avons pas vraiment besoin de ConcurrentQueue, cela peut améliorer les performances.

 class FixedSizedConcurrentQueue { readonly Queue queue = new Queue(); readonly object syncObject = new object(); public int MaxSize { get; private set; } public FixedSizedConcurrentQueue(int maxSize) { MaxSize = maxSize; } public void Enqueue(T obj) { lock (syncObject) { queue.Enqueue(obj); while (queue.Count > MaxSize) { queue.Dequeue(); } } } public T[] ToArray() { T[] result = null; lock (syncObject) { result = queue.ToArray(); } return result; } public void Clear() { lock (syncObject) { queue.Clear(); } } } 

Ceci est ma version de la queue:

 public class FixedSizedQueue { private object LOCK = new object(); ConcurrentQueue queue; public int MaxSize { get; set; } public FixedSizedQueue(int maxSize, IEnumerable items = null) { this.MaxSize = maxSize; if (items == null) { queue = new ConcurrentQueue(); } else { queue = new ConcurrentQueue(items); EnsureLimitConstraint(); } } public void Enqueue(T obj) { queue.Enqueue(obj); EnsureLimitConstraint(); } private void EnsureLimitConstraint() { if (queue.Count > MaxSize) { lock (LOCK) { T overflow; while (queue.Count > MaxSize) { queue.TryDequeue(out overflow); } } } } ///  /// returns the current snapshot of the queue ///  ///  public T[] GetSnapshot() { return queue.ToArray(); } } 

Je trouve utile d’avoir un constructeur basé sur un IEnumerable et je trouve utile d’avoir un GetSnapshot pour avoir une liste de sécurité multithread (tableau dans ce cas) des éléments au moment de l’appel, qui ne se lève pas des erreurs si la collection sous-jacente change.

La vérification du double compte sert à empêcher le locking dans certaines circonstances.