Je voudrais gérer une collection en parallèle, mais j’ai du mal à l’implémenter et j’espère donc avoir de l’aide.
Le problème se pose si je veux appeler une méthode marquée async en C #, dans le lambda de la boucle parallèle. Par exemple:
var bag = new ConcurrentBag(); Parallel.ForEach(myCollection, async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff } var count = bag.Count;
Le problème se produit avec le nombre 0, car tous les threads créés ne sont en réalité que des threads d’arrière-plan et l’appel Parallel.ForEach
n’attend pas la fin. Si je supprime le mot-clé async, la méthode ressemble à ceci:
var bag = new ConcurrentBag(); Parallel.ForEach(myCollection, item => { // some pre stuff var responseTask = await GetData(item); responseTask.Wait(); var response = responseTask.Result; bag.Add(response); // some post stuff } var count = bag.Count;
Cela fonctionne, mais cela désactive complètement l’intelligence et je dois faire quelques manipulations manuelles des exceptions.
Comment puis-je implémenter une boucle Parallel.ForEach
, qui utilise le mot-clé wait dans le lambda? C’est possible?
Le prototype de la méthode Parallel.ForEach prend un paramètre Action
, mais je veux qu’il attende mon lambda asynchrone.
Si vous voulez simplement un parallélisme simple, vous pouvez le faire:
var bag = new ConcurrentBag
Si vous avez besoin de quelque chose de plus complexe, consultez l’article de Stephen Toub sur ForEachAsync
.
Vous pouvez utiliser la méthode d’extension ParallelForEachAsync
partir du package AsyncEnumerator NuGet :
using System.Collections.Async; var bag = new ConcurrentBag
J’ai créé une méthode d’extension pour cela qui utilise SemaphoreSlim et permet également de définir le degré maximum de parallélisme
/// /// Concurrently Executes async actions for each item of /// /// Type of IEnumerable /// instance of "/> /// an async to execute /// Optional, An integer that represents the maximum degree of parallelism, /// Must be grater than 0 /// A Task representing an async operation /// If the maxActionsToRunInParallel is less than 1 public static async Task ForEachAsyncConcurrent( this IEnumerable enumerable, Func action, int? maxDegreeOfParallelism = null) { if (maxDegreeOfParallelism.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value)) { var tasksWithThrottler = new List(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item); // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); })); } // Wait for all tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } }
Échantillon utilisation:
await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);