C# 基于消息发布订阅模型的示例(上)
在我们的开发过程中,我们经常会遇到这样的场景就是一个对象的其中的一些状态依赖于另外的一个对象的状态,而且这两个对象之间彼此是没有关联的,及两者之间的耦合性非常低,特别是在这种基于容器模型的开发中遇到的会非常多,比如Prism框架或者MEF这种框架中,而我们会发现在这样的系统中我们经常使用一种Publish和Subscribe的模式来进行交互,这种交互有什么好处呢?基于带着这些问题的思考,我们来一步步来剖析!
首先第一步就是定义一个叫做IEventAggregator的接口,里面定义了一些重载的Subscribe和Publish方法,我们具体来看一看这个接口:
////// Enablesloosely-coupledpublicationofandsubscriptiontoevents. /// publicinterfaceIEventAggregator { ////// Getsorsetsthedefaultpublicationthreadmarshaller. /// ////// Thedefaultpublicationthreadmarshaller. /// ActionPublicationThreadMarshaller{get;set;} /// /// Subscribesaninstancetoalleventsdeclaredthroughimplementationsof ////// Theinstancetosubscribeforeventpublication. voidSubscribe(objectinstance); /// /// Unsubscribestheinstancefromallevents. /// ///Theinstancetounsubscribe. voidUnsubscribe(objectinstance); /// /// Publishesamessage. /// ///Themessageinstance. /// /// Usesthedefaultthreadmarshallerduringpublication. /// voidPublish(objectmessage); ////// Publishesamessage. /// ///Themessageinstance. /// Allowsthepublishertoprovideacustomthreadmarshallerforthemessagepublication. voidPublish(objectmessage,Action marshal); }
有了这个接口,接下来就是怎样去实现这个接口中的各种方法,我们来看看具体的实现过程。
////// Enablesloosely-coupledpublicationofandsubscriptiontoevents. /// publicclassEventAggregator:IEventAggregator { ////// Thedefaultthreadmarshallerusedforpublication; /// publicstaticActionDefaultPublicationThreadMarshaller=action=>action(); readonlyList handlers=newList (); /// /// Initializesanewinstanceofthe publicEventAggregator() { PublicationThreadMarshaller=DefaultPublicationThreadMarshaller; } ///class. /// /// Getsorsetsthedefaultpublicationthreadmarshaller. /// ////// Thedefaultpublicationthreadmarshaller. /// publicActionPublicationThreadMarshaller{get;set;} /// /// Subscribesaninstancetoalleventsdeclaredthroughimplementationsof ////// Theinstancetosubscribeforeventpublication. publicvirtualvoidSubscribe(objectinstance) { lock(handlers) { if(handlers.Any(x=>x.Matches(instance))) { return; } handlers.Add(newHandler(instance)); } } /// /// Unsubscribestheinstancefromallevents. /// ///Theinstancetounsubscribe. publicvirtualvoidUnsubscribe(objectinstance) { lock(handlers) { varfound=handlers.FirstOrDefault(x=>x.Matches(instance)); if(found!=null) { handlers.Remove(found); } } } /// /// Publishesamessage. /// ///Themessageinstance. /// /// Doesnotmarshallthethepublicationtoanyspecialthreadbydefault. /// publicvirtualvoidPublish(objectmessage) { Publish(message,PublicationThreadMarshaller); } ////// Publishesamessage. /// ///Themessageinstance. /// Allowsthepublishertoprovideacustomthreadmarshallerforthemessagepublication. publicvirtualvoidPublish(objectmessage,Action marshal) { Handler[]toNotify; lock(handlers) { toNotify=handlers.ToArray(); } marshal(()=> { varmessageType=message.GetType(); vardead=toNotify .Where(handler=>!handler.Handle(messageType,message)) .ToList(); if(dead.Any()) { lock(handlers) { foreach(varhandlerindead) { handlers.Remove(handler); } } } }); } protectedclassHandler { readonlyWeakReferencereference; readonlyDictionary supportedHandlers=newDictionary (); publicHandler(objecthandler) { reference=newWeakReference(handler); varinterfaces=handler.GetType().GetInterfaces() .Where(x=>typeof(IHandle).IsAssignableFrom(x)&&x.IsGenericType); foreach(var@interfaceininterfaces) { vartype=@interface.GetGenericArguments()[0]; varmethod=@interface.GetMethod("Handle"); supportedHandlers[type]=method; } } publicboolMatches(objectinstance) { returnreference.Target==instance; } publicboolHandle(TypemessageType,objectmessage) { vartarget=reference.Target; if(target==null) returnfalse; foreach(varpairinsupportedHandlers) { if(pair.Key.IsAssignableFrom(messageType)) { pair.Value.Invoke(target,new[]{message}); returntrue; } } returntrue; } } }
首先在EventAggregator的内部维护了一个LIst
我们会发现在每一次当执行这个Subscribe的方法的时候,会将当前object类型的参数instance传入到Handler这个对象中,在Handler这个类的构造函数中,首先将这个instance放入到一个弱引用中去,然后再获取这个对象所有继承的接口,并查看是否继承了IHandle
publicinterfaceIHandle{} ////// Denotesaclasswhichcanhandleaparticulartypeofmessage. /// ///Thetypeofmessagetohandle. publicinterfaceIHandle :IHandle { /// /// Handlesthemessage. /// ///Themessage. voidHandle(TMessagemessage); }
在看完了Subscribe这个方法后,后面我们就来看看Unsubscribe方法吧,这个思路其实很简单就是找到List
////// Publishesamessage. /// ///Themessageinstance. /// Allowsthepublishertoprovideacustomthreadmarshallerforthemessagepublication. publicvirtualvoidPublish(objectmessage,Action marshal) { Handler[]toNotify; lock(handlers) { toNotify=handlers.ToArray(); } marshal(()=> { varmessageType=message.GetType(); vardead=toNotify .Where(handler=>!handler.Handle(messageType,message)) .ToList(); if(dead.Any()) { lock(handlers) { foreach(varhandlerindead) { handlers.Remove(handler); } } } }); }
我们看到,在发布一个object类型的message的时候,必然对应着另外的一个对象来处理这个消息,那么怎样找到这个消息的处理这呢?
对,我们在Subscribe一个对象的时候不是已经通过反射将订阅这个消息的对象及方法都存在了一个List
这里面还有一个要点就是,如果执行的方法返回了false,就是执行不成功,那么就从当前的List
1所有消息订阅的对象必须实现统一的接口IHandle
2整个EventAggregator必须是单实例或者是静态的,这样才能够在统一的集合中去实现上述的各种操作。
最后还是按照之前的惯例,最后给出一个具体的实例来做相关的说明,请点击此处进行下载,在下篇中我们将介绍一种简单版的基于事件的发布和订阅模式的例子。
以上就是C#基于消息发布订阅模型的示例(上)的详细内容,更多关于c#发布订阅模型的资料请关注毛票票其它相关文章!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。