Foreach parallèle avec lambda asynchrone

Je voudrais gérer une collection en parallèle, mais j’ai du mal à l’implémenter et j’espère donc avoir de l’aide.

Le problème se pose si je veux appeler une méthode marquée async en C #, dans le lambda de la boucle parallèle. Par exemple:

var bag = new ConcurrentBag(); Parallel.ForEach(myCollection, async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff } var count = bag.Count; 

Le problème se produit avec le nombre 0, car tous les threads créés ne sont en réalité que des threads d’arrière-plan et l’appel Parallel.ForEach n’attend pas la fin. Si je supprime le mot-clé async, la méthode ressemble à ceci:

 var bag = new ConcurrentBag(); Parallel.ForEach(myCollection, item => { // some pre stuff var responseTask = await GetData(item); responseTask.Wait(); var response = responseTask.Result; bag.Add(response); // some post stuff } var count = bag.Count; 

Cela fonctionne, mais cela désactive complètement l’intelligence et je dois faire quelques manipulations manuelles des exceptions.

Comment puis-je implémenter une boucle Parallel.ForEach , qui utilise le mot-clé wait dans le lambda? C’est possible?

Le prototype de la méthode Parallel.ForEach prend un paramètre Action , mais je veux qu’il attende mon lambda asynchrone.

Si vous voulez simplement un parallélisme simple, vous pouvez le faire:

 var bag = new ConcurrentBag(); var tasks = myCollection.Select(async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff }); await Task.WhenAll(tasks); var count = bag.Count; 

Si vous avez besoin de quelque chose de plus complexe, consultez l’article de Stephen Toub sur ForEachAsync .

Vous pouvez utiliser la méthode d’extension ParallelForEachAsync partir du package AsyncEnumerator NuGet :

 using System.Collections.Async; var bag = new ConcurrentBag(); await myCollection.ParallelForEachAsync(async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff }, maxDegreeOfParallelism: 10); var count = bag.Count; 

J’ai créé une méthode d’extension pour cela qui utilise SemaphoreSlim et permet également de définir le degré maximum de parallélisme

  ///  /// Concurrently Executes async actions for each item of  ///  /// Type of IEnumerable /// instance of "/> /// an async  to execute /// Optional, An integer that represents the maximum degree of parallelism, /// Must be grater than 0 /// A Task representing an async operation /// If the maxActionsToRunInParallel is less than 1 public static async Task ForEachAsyncConcurrent( this IEnumerable enumerable, Func action, int? maxDegreeOfParallelism = null) { if (maxDegreeOfParallelism.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value)) { var tasksWithThrottler = new List(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item); // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); })); } // Wait for all tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } } 

Échantillon utilisation:

 await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);