// let's say there is a list of 1000+ URLs ssortingng[] urls = { "http://google.com", "http://yahoo.com", ... }; // now let's send HTTP requests to each of these URLs in parallel urls.AsParallel().ForAll(async (url) => { var client = new HttpClient(); var html = await client.GetSsortingngAsync(url); });
Voici le problème, il démarre plus de 1000 requêtes Web simultanées. Existe-t-il un moyen facile de limiter le nombre de demandes HTTP asynchrones? Ainsi, pas plus de 20 pages Web sont téléchargées à un moment donné. Comment le faire de la manière la plus efficace?
Vous pouvez certainement le faire dans les dernières versions d’Async pour .NET, en utilisant .NET 4.5 Beta. Le post précédent de «usr» pointe vers un bon article écrit par Stephen Toub, mais la nouvelle moins annoncée est que le sémaphore asynchrone est effectivement entré dans la version bêta de .NET 4.5.
Si vous regardez notre classe bien-aimée SemaphoreSlim
(que vous devriez utiliser car elle est plus performante que le Semaphore
original), elle dispose désormais de la série de surcharges WaitAsync(...)
, avec tous les arguments attendus – intervalles de délai d’attente, jetons d’annulation , tous vos amis habituels du planning 🙂
Stephen a également écrit un article de blog plus récent sur les nouveaux goodies .NET 4.5 sortis avec la version bêta, voir Nouveautés pour le parallélisme dans .NET 4.5 Beta .
Enfin, voici un exemple de code sur la façon d’utiliser SemaphoreSlim pour la limitation de la méthode asynchrone:
public async Task MyOuterMethod() { // let's say there is a list of 1000+ URLs var urls = { "http://google.com", "http://yahoo.com", ... }; // now let's send HTTP requests to each of these URLs in parallel var allTasks = new List(); var throttler = new SemaphoreSlim(initialCount: 20); foreach (var url in urls) { // do an async wait until we can schedule again await throttler.WaitAsync(); // using Task.Run(...) to run the lambda in its own parallel // flow on the threadpool allTasks.Add( Task.Run(async () => { try { var client = new HttpClient(); var html = await client.GetSsortingngAsync(url); } finally { throttler.Release(); } })); } // won't get here until all urls have been put into tasks await Task.WhenAll(allTasks); // won't get here until all tasks have completed in some way // (either success or exception) }
Enfin, une solution digne de ce nom est une solution qui utilise la planification basée sur la TPL. Vous pouvez créer des tâches liées aux delegates sur la TPL qui n’ont pas encore été démarrées et autoriser un planificateur de tâches personnalisé à limiter la concurrence. En fait, il y a un exemple MSDN pour cela ici:
Voir aussi TaskScheduler .
Malheureusement, le .NET Framework manque les plus importants combinateurs pour orchestrer des tâches asynchrones parallèles. Il n’y a pas une telle chose intégrée.
Regardez la classe AsyncSemaphore construite par le plus respectable Stephen Toub. Ce que vous voulez s’appelle un sémaphore et vous avez besoin d’une version asynchrone.
Si vous avez un IEnumerable (c’est-à-dire des chaînes d’URL) et que vous souhaitez effectuer une opération liée aux E / S avec chacune (c.-à-d. Faire une requête http asynchrone) ET éventuellement vous définissez également le nombre maximal de concurrents Les demandes d’E / S en temps réel, voici comment vous pouvez le faire. De cette façon, vous n’utilisez pas le pool de threads et autres, la méthode utilise semaphoreslim pour contrôler les demandes d’E / S simultanées similaires à un modèle de fenêtre glissante qu’une requête termine, quitte le sémaphore et le suivant entre.
utilisation: attendez ForEachAsync (urlSsortingngs, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync( IEnumerable inputEnumerable, Func asyncProcessor, int? maxDegreeOfParallelism = null) { int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism; SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount); IEnumerable tasks = inputEnumerable.Select(async input => { await throttler.WaitAsync().ConfigureAwait(false); try { await asyncProcessor(input).ConfigureAwait(false); } finally { throttler.Release(); } }); return Task.WhenAll(tasks); }
Theo Yaung exemple est bien, mais il existe une variante sans liste de tâches en attente.
class SomeChecker { private const int ThreadCount=20; private CountdownEvent _countdownEvent; private SemaphoreSlim _throttler; public Task Check(IList urls) { _countdownEvent = new CountdownEvent(urls.Count); _throttler = new SemaphoreSlim(ThreadCount); return Task.Run( // prevent UI thread lock async () =>{ foreach (var url in urls) { // do an async wait until we can schedule again await _throttler.WaitAsync(); ProccessUrl(url); // NOT await } //instead of await Task.WhenAll(allTasks); _countdownEvent.Wait(); }); } private async Task ProccessUrl(ssortingng url) { try { var page = await new WebClient() .DownloadSsortingngTaskAsync(new Uri(url)); ProccessResult(page); } finally { _throttler.Release(); _countdownEvent.Signal(); } } private void ProccessResult(ssortingng page){/*....*/} }
Il y a beaucoup de pièges et l’utilisation directe d’un sémaphore peut être délicate dans les cas d’erreur. Je suggère donc d’utiliser AsyncEnumerator NuGet Package au lieu de réinventer la roue:
// let's say there is a list of 1000+ URLs ssortingng[] urls = { "http://google.com", "http://yahoo.com", ... }; // now let's send HTTP requests to each of these URLs in parallel await urls.ParallelForEachAsync(async (url) => { var client = new HttpClient(); var html = await client.GetSsortingngAsync(url); }, maxDegreeOfParallelism: 20);
SemaphoreSlim peut être très utile ici. Voici la méthode d’extension que j’ai créée.
/// /// Concurrently Executes async actions for each item of /// /// Type of IEnumerable /// instance of "/> /// an async to execute /// Optional, max numbers of the actions to run in parallel, /// Must be grater than 0 /// A Task representing an async operation /// If the maxActionsToRunInParallel is less than 1 public static async Task ForEachAsyncConcurrent( this IEnumerable enumerable, Func action, int? maxActionsToRunInParallel = null) { if (maxActionsToRunInParallel.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value)) { var tasksWithThrottler = new List(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item); // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); })); } // Wait for all of the provided tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } }
Échantillon utilisation:
await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);
Des calculs parallèles doivent être utilisés pour accélérer les opérations liées au processeur. Nous parlons ici d’opérations liées aux E / S. Votre implémentation doit être purement asynchrone , à moins que vous ne remplissiez le kernel unique occupé de votre processeur multicœur.
EDIT j’aime la suggestion faite par usr d’utiliser un “sémaphore asynchrone” ici.
Bien que 1000 tâches puissent être mises en queue très rapidement, la bibliothèque Parallel Tasks ne peut gérer que des tâches simultanées égales à la quantité de cœurs de processeur de la machine. Cela signifie que si vous avez une machine à quatre cœurs, seules 4 tâches seront exécutées à un moment donné (à moins que vous ne diminuiez le MaxDegreeOfParallelism).
Utilisez MaxDegreeOfParallelism
, qui est une option que vous pouvez spécifier dans Parallel.ForEach()
:
var options = new ParallelOptions { MaxDegreeOfParallelism = 20 }; Parallel.ForEach(urls, options, url => { var client = new HttpClient(); var html = client.GetSsortingngAsync(url); // do stuff with html });
Ancienne question, nouvelle réponse. @vitidev avait un bloc de code qui était réutilisé presque intact dans un projet que j’ai examiné. Après avoir discuté avec quelques collègues, un a demandé “Pourquoi n’utilisez-vous pas simplement les méthodes TPL intégrées?” ActionBlock ressemble au vainqueur. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx . Cela ne changera probablement aucun code existant, mais cherchera certainement à adopter ce nuget et à réutiliser les meilleures pratiques de M. Softy pour le parallélisme limité.
Essentiellement, vous allez vouloir créer une action ou une tâche pour chaque URL à atteindre, les placer dans une liste, puis traiter cette liste, en limitant le nombre pouvant être traité en parallèle.
Mon article de blog montre comment procéder à la fois avec Tasks et avec Actions et fournit un exemple de projet que vous pouvez télécharger et exécuter pour voir les deux en action.
Si vous utilisez Actions, vous pouvez utiliser la fonction .Net Parallel.Invoke intégrée. Ici, nous nous limitons à exécuter au maximum 20 threads en parallèle.
var listOfActions = new List(); foreach (var url in urls) { var localUrl = url; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => CallUrl(localUrl))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 20}; Parallel.Invoke(options, listOfActions.ToArray());
Avec les tâches, il n’y a pas de fonction intégrée. Cependant, vous pouvez utiliser celui que je fournis sur mon blog.
/// /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. /// /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. /// /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List (); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } }
Et puis en créant votre liste de tâches et en appelant la fonction pour les faire fonctionner, avec un maximum de 20 simultanées à la fois, vous pouvez le faire:
var listOfTasks = new List(); foreach (var url in urls) { var localUrl = url; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await CallUrl(localUrl))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);