Limiter les tâches asynchrones

Je voudrais exécuter un tas de tâches asynchrones, avec une limite sur le nombre de tâches en attente d’achèvement à un moment donné.

Supposons que vous ayez 1 000 URL et que vous ne souhaitez ouvrir que 50 demandes à la fois. mais dès qu’une demande est terminée, vous ouvrez une connexion à l’URL suivante dans la liste. De cette façon, il y a toujours exactement 50 connexions ouvertes à la fois, jusqu’à ce que la liste des URL soit épuisée.

Je veux aussi utiliser un nombre donné de threads si possible.

Je suis venu avec une méthode d’extension, ThrottleTasksAsync qui fait ce que je veux. Existe-t-il une solution plus simple? Je suppose que c’est un scénario courant.

Usage:

 class Program { static void Main(ssortingng[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } } 

Voici le code:

 static class IEnumerableExtensions { public static async Task ThrottleTasksAsync(this IEnumerable enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task> taskToRun) { var blockingQueue = new BlockingCollection(new ConcurrentBag()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable IterateUntilTrue(Func condition) { while (!condition()) yield return true; } } 

La méthode utilise BlockingCollection et SemaphoreSlim pour la faire fonctionner. Le régulateur est exécuté sur un thread et toutes les tâches asynchrones sont exécutées sur l’autre thread. Pour atteindre le parallélisme, j’ai ajouté un paramètre maxDegreeOfParallelism qui est transmis à une boucle Parallel.ForEach redéfinie comme une boucle while.

L’ancienne version était:

 foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here } 

Mais le pool de threads est épuisé rapidement et vous ne pouvez pas faire async / await .

Bonus: pour contourner le problème dans BlockingCollection où une exception est générée dans Take() lorsque CompleteAdding() est appelée, j’utilise la surcharge TryTake avec un délai d’expiration. Si je n’utilisais pas le délai d’attente dans TryTake , cela TryTake l’utilisation de BlockingCollection car TryTake ne bloquerait pas. Y a-t-il une meilleure façon? Idéalement, il y aurait une méthode TakeAsync .

Comme suggéré, utilisez TPL Dataflow.

Un TransformBlock peut être ce que vous recherchez.

Vous définissez un MaxDegreeOfParallelism pour limiter le nombre de chaînes pouvant être transformées (c.-à-d. Combien d’URL peuvent être téléchargées) en parallèle. Vous publiez ensuite les URL dans le bloc et lorsque vous avez terminé, vous dites au bloc que vous avez terminé d’append des éléments et que vous récupérez les réponses.

 var downloader = new TransformBlock( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList responses; if (buffer.TryReceiveAll(out responses)) { //process responses } 

Remarque: le TransformBlock stocke ses entrées et ses sorties. Pourquoi, alors, avons-nous besoin de le lier à un BufferBlock ?

Parce que le TransformBlock ne se terminera pas tant que tous les éléments ( HttpResponse ) n’auront pas été consommés, et await downloader.Completion . Au lieu de cela, nous laissons le downloader transférer toute sa sortie à un bloc tampon dédié – puis nous attendons que le downloader se termine et inspecte le bloc tampon.

Supposons que vous ayez 1 000 URL et que vous ne souhaitez ouvrir que 50 demandes à la fois. mais dès qu’une demande est terminée, vous ouvrez une connexion à l’URL suivante dans la liste. De cette façon, il y a toujours exactement 50 connexions ouvertes à la fois, jusqu’à ce que la liste des URL soit épuisée.

La solution simple suivante est apparue plusieurs fois ici sur SO. Il n’utilise pas de code de blocage et ne crée pas de threads de manière explicite.

 const int MAX_DOWNLOADS = 50; static async Task DownloadAsync(ssortingng[] urls) { using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { var data = await httpClient.GetSsortingngAsync(url); Console.WriteLine(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } } 

Le traitement des données téléchargées doit se faire sur un pipeline différent , avec un niveau de parallélisme différent, en particulier s’il s’agit d’un traitement lié au processeur.

Par exemple, vous voudrez probablement avoir 4 threads effectuant simultanément le traitement des données (le nombre de cœurs de processeur) et jusqu’à 50 requêtes en attente pour plus de données (qui n’utilisent pas de thread du tout). AFAICT, ce n’est pas ce que votre code fait actuellement.

C’est là que TPL Dataflow ou Rx peut être utile comme solution préférée. Cependant, il est certainement possible d’implémenter quelque chose comme ça avec le TPL simple. Notez que le seul code de blocage ici est celui qui effectue le traitement de données réel dans Task.Run :

 const int MAX_DOWNLOADS = 50; const int MAX_PROCESSORS = 4; // process data class Processing { SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS); HashSet _pending = new HashSet(); object _lock = new Object(); async Task ProcessAsync(ssortingng data) { await _semaphore.WaitAsync(); try { await Task.Run(() => { // simuate work Thread.Sleep(1000); Console.WriteLine(data); }); } finally { _semaphore.Release(); } } public async void QueueItemAsync(ssortingng data) { var task = ProcessAsync(data); lock (_lock) _pending.Add(task); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // not the task's exception, rethrow // don't remove faulted/cancelled tasks from the list return; } // remove successfully completed tasks from the list lock (_lock) _pending.Remove(task); } public async Task WaitForCompleteAsync() { Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } } // download data static async Task DownloadAsync(ssortingng[] urls) { var processing = new Processing(); using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async (url) => { await semaphore.WaitAsync(); try { var data = await httpClient.GetSsortingngAsync(url); // put the result on the processing pipeline processing.QueueItemAsync(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks.ToArray()); await processing.WaitForCompleteAsync(); } } 

Comme demandé, voici le code que j’ai fini par utiliser.

Le travail est configuré dans une configuration maître-détail et chaque maître est traité en tant que lot. Chaque unité de travail est mise en queue de cette manière:

 var success = true; // Start processing all the master records. Master master; while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))) { await masterBuffer.SendAsync(master); } // Finished sending master records masterBuffer.Complete(); // Now, wait for all the batches to complete. await batchAction.Completion; return success; 

Les Masters sont mis en mémoire tampon un par un pour économiser du travail pour d’autres processus externes. Les détails de chaque maître sont dissortingbués pour le travail via masterTransform TransformManyBlock . Un BatchedJoinBlock est également créé pour collecter les détails dans un lot.

Le travail réel est effectué dans le detailTransform TransformBlock , de manière asynchrone, 150 à la fois. BoundedCapacity est défini sur 300 pour garantir que trop de Masters ne soient pas mis en mémoire tampon au début de la chaîne, tout en laissant suffisamment d’espace pour que des enregistrements détaillés soient mis en queue pour permettre le traitement de 150 enregistrements à la fois. Le bloc génère un object vers ses cibles, car il est filtré sur les liens selon qu’il s’agit d’un Detail ou d’une Exception .

Le batchAction ActionBlock collecte les résultats de tous les lots et effectue des mises à jour de firebase database en bloc, une journalisation des erreurs, etc. pour chaque lot.

Il y aura plusieurs BatchedJoinBlock s, un pour chaque maître. Étant donné que chaque ISourceBlock est ISourceBlock manière séquentielle et que chaque lot n’accepte que le nombre d’enregistrements de détails associés à un maître, les lots seront traités dans l’ordre. Chaque bloc ne génère qu’un seul groupe et est dissocié à la fin. Seul le dernier bloc de lots propage son achèvement au ActionBlock final.

Le réseau de stream de données:

 // The dataflow network BufferBlock masterBuffer = null; TransformManyBlock masterTransform = null; TransformBlock detailTransform = null; ActionBlock, IList>> batchAction = null; // Buffer master records to enable efficient throttling. masterBuffer = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1 }); // Sequentially transform master records into a stream of detail records. masterTransform = new TransformManyBlock(async masterRecord => { var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords; }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); // Process each detail record asynchronously, 150 at a time. detailTransform = new TransformBlock(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 }); // Perform the proper action for each batch batchAction = new ActionBlock, IList>>(async batch => { var details = batch.Item1.Cast(); var errors = batch.Item2.Cast(); // Do something with the batch here }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true }); masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });