Comment implémenter une saga en utilisant un pattern scatter / Gather Dans MassTransit 3.0

Jimmy Boagard décrit une chaîne de restauration rapide de McDonalds comparée à un modèle de collecte par dispersion.

Image de workflow volée dans l’article ci-dessus: entrer la description de l'image ici

Réflexions initiales sur la mise en œuvre:

Pour avoir une interface commune pour tous les types d’événements FoodOrdered que toutes les stations alimentaires obtiendraient, chaque station alimentaire pourrait consumr / créer son article respectif et publier un événement commun. Ex: fries et burger station reçoit un message concernant une commande de frites, La station de frites consum la commande annonce un ItemDoneEvent que la saga écoute.

Préoccupations initiales:

Étant donné que la Saga ne se soucie pas du type de nourriture complétée, le simple fait que toute la nourriture soit terminée semble être une solution acceptable . Cependant, après avoir lu les avertissements concernant le partage des files d’attente et avoir remarqué que le filtrage de Consumer.Conditional a été supprimé avec MassTransit 3.0, il semble que le framework dise “Bad Things (TM)” se produira avec ce type d’approche. Mais je ne sais pas comment vous le feriez sans créer une demande de message et une réponse et un événement de corrélation pour chaque produit alimentaire dans la cuisine. Ex: FriesOrdered, BurgerOrdered FriesCooked, BurgerCooked. Ce serait très fastidieux si vous deviez le faire pour chaque élément de la cuisine?

Compte tenu de ce qui précède, à quoi ressemblerait un bon exemple de saga pour ce type de workflow?

Ne pourriez-vous pas “simplement” passer l’object dans la queue, en tant que paramètre d’événement? Lorsque l’auditeur de saga reçoit un événement “order completed”, il contiendra l’object terminé dans l’événement?

J’imagine qu’il est envoyé dans la queue via une méthode générique, où l’object doit implémenter IFoodOrdered

Vous pouvez ensuite implémenter une méthode virtuelle que la saga peut utiliser pour faire la chose “générique” lorsqu’elle est captée, et vous n’avez qu’à implémenter des surcharges pour ces éléments spéciaux, ce qui nécessite quelque chose de spécial?

Le problème avec le retour des événements finis dans la saga est qu’il crée un conflit sur une ressource partagée (c.-à-d. L’état de saga).

Jim a publié un autre article après celui que vous avez mentionné, qui décrit le problème et la solution. Bien sûr, il parle spécifiquement de NServiceBus, mais le problème et les concepts sont les mêmes.

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

Créez un stockage externe. Mettez un enregistrement pour chaque élément de travail. Laissez chaque travailleur mettre son propre travail à son terme tandis que la saga interroge efficacement en utilisant la messagerie différée pour voir si tout le travail est fait.

Ensuite, vous faites toujours de la dispersion, mais le «regroupeur» a été remplacé par le modèle de gestionnaire de processus pour réduire les conflits.

Je suis tombé sur un problème similaire – besoin de publier quelques dizaines de commandes (toutes les mêmes interface, IMyRequest ) et tout attendre.

En fait, ma commande initie d’autres saga, qui publient IMyRequestDone à la fin du traitement sans marquer la saga comme étant terminée. (Il est nécessaire de les compléter plus tard.) Ainsi, au lieu de sauvegarder le nombre de sagas nestedes terminées dans la saga parente, je me suis contenté d’interroger l’état des instances de saga enfant.

Vérifiez tous les messages MyRequestDone :

 Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x => { // timeout for all requests x.Delay = TimeSpan.FromMinutes(10); x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); }); During(Active, When(Xxx) .ThenAsync(async context => { await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay; context.Instance.WaitingMyResponsesCount = 2; }) .TransitionTo(WaitingMyResponses) .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance)) ); During(WaitingMyResponses, When(MyRequestDone) .Then(context => { if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow) throw new TimeoutException(); }) .If(context => { var db = serviceProvider.GetRequiredService(); var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed return allDone; }, x => x .Unschedule(FailSagaOnRequestsTimeout) .TransitionTo(Active)) ) .Catch(x => x.TransitionTo(Failed)) ); During(WaitingMyResponses, When(FailSagaOnRequestsTimeout.Received) .TransitionTo(Failed) 

Vérifiez périodiquement que toutes les requêtes ont été effectuées (par “Réduction du chargement de NServiceBus Saga”):

 Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x => { // check interval x.Delay = TimeSpan.FromSeconds(15); x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); }); During(Active, When(Xxx) .ThenAsync(async context => { await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10); context.Instance.WaitingMyResponsesCount = 2; }) .TransitionTo(WaitingMyResponses) .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)) ); During(WaitingMyResponses, When(CheckAllRequestsDone.Recieved) .Then(context => { var db = serviceProvider.GetRequiredService(); var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); if (!allDone) { if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay) throw new TimeoutException(); throw new NotAllDoneException(); } }) .TransitionTo(Active) .Catch(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))) .Catch(x => x.TransitionTo(Failed));