springboot websocket集群(stomp协议)连接时候传递参数
最近在公司项目中接到个需求。就是后台跟前端浏览器要保持长连接,后台主动往前台推数据。
网上查了下,websocketstomp协议处理这个很简单。尤其是跟springboot集成。
但是由于开始是单机玩的,很顺利。
但是后面部署到生产搞集群的话,就会出问题了。
假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到。但是B节点由于没有跟浏览器A建立连接。B节点发的消息浏览器就收不到了。
网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的。
还有很多思路都是通过session获取信息的。但是这都不是我需要的。我需要的是从前台传递参数,连接的时候每个节点保存下。然后通过SimpleUserRegistry.getUser获取。
话不多说,直接上代码。
varWEB_SOCKET={ topic:"", url:"", stompClient:null, connect:function(url,topic,callback,userid){ this.url=url; this.topic=topic; varsocket=newSockJS(url);//连接SockJS的endpoint名称为"endpointOyzc" WEB_SOCKET.stompClient=Stomp.over(socket);//使用STMOP子协议的WebSocket客户端 WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//连接WebSocket服务端 //console.log('Connected:'+frame); //通过stompClient.subscribe订阅/topic/getResponse目标(destination)发送的消息 WEB_SOCKET.stompClient.subscribe(topic,callback); }); } };
这是响应的前端代码。只需要引入两个js。调用newSockJS(url)就代表跟服务器建立连接了。
@Configuration //注解开启使用STOMP协议来传输基于代理(messagebroker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样 @EnableWebSocketMessageBroker publicclassWebSocketConfigextendsAbstractWebSocketMessageBrokerConfigurer{ @Autowired privateGetHeaderParamInterceptorgetHeaderParamInterceptor; @Override //注册STOMP协议的节点(endpoint),并映射指定的url publicvoidregisterStompEndpoints(StompEndpointRegistryregistry){ //注册一个STOMP的endpoint,并指定使用SockJS协议 registry.addEndpoint("/endpointOyzc") .setAllowedOrigins("*") .withSockJS(); /*registry.addEndpoint("/endpointOyzc") .setAllowedOrigins("*") .setHandshakeHandler(xlHandshakeHandler) .withSockJS();*/ } @Override //配置消息代理(MessageBroker) publicvoidconfigureMessageBroker(MessageBrokerRegistryregistry){ //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理 registry.enableSimpleBroker("/topic","/user"); //全局使用的消息前缀(客户端订阅路径上会体现出来) //registry.setApplicationDestinationPrefixes("/app"); //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/ registry.setUserDestinationPrefix("/user"); } /** *采用自定义拦截器,获取connect时候传递的参数 * *@paramregistration */ @Override publicvoidconfigureClientInboundChannel(ChannelRegistrationregistration){ registration.interceptors(getHeaderParamInterceptor); } }
注:上面的endpointOyzc就是前端的url。后面注册端点,前台链接。
然后注意下configureClientInboundChannel这个方法,这个方法里面注入拦截器就是为了链接时候接收参数的。
/** *@author:hao *@description:websocket建立链接的时候获取headeri里认证的参数拦截器。 *@time:2019/7/320:42 */ @Component publicclassGetHeaderParamInterceptorextendsChannelInterceptorAdapter{ @Override publicMessage>preSend(Message>message,MessageChannelchannel){ StompHeaderAccessoraccessor=MessageHeaderAccessor.getAccessor(message,StompHeaderAccessor.class); if(StompCommand.CONNECT.equals(accessor.getCommand())){ Objectraw=message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if(rawinstanceofMap){ Objectname=((Map)raw).get("userid"); if(nameinstanceofLinkedList){ //设置当前访问的认证用户 accessor.setUser(newJqxxPrincipal(((LinkedList)name).get(0).toString())); } } } returnmessage; } } /** *@author:hao *@description:自定义的java.security.Principal *@time:2019/7/320:42 */ publicclassJqxxPrincipalimplementsPrincipal{ privateStringloginName; publicJqxxPrincipal(StringloginName){ this.loginName=loginName; } @Override publicStringgetName(){ returnloginName; } }
这样就存入的前台传的参数。
后台发消息的时候怎么发呢?
/** *@author:hao *@description:websocket发送代理,负责发送消息 *@time:2019/7/411:01 */ @Component @Slf4j publicclassWebsocketSendProxy{ @Autowired privateSimpMessagingTemplatetemplate; @Autowired privateSimpUserRegistryuserRegistry; @Resource(name="redisServiceImpl") privateRedisServiceredisService; @Value("spring.redis.message.topic-name") privateStringtopicName; publicvoidsendMsg(RedisWebsocketMsg redisWebsocketMsg){ SimpUsersimpUser=userRegistry.getUser(redisWebsocketMsg.getReceiver()); log.info("发送消息前获取接收方为{},根据Registry获取本节点上这个用户{}",redisWebsocketMsg.getReceiver(),simpUser); if(simpUser!=null&&StringUtils.isNotBlank(simpUser.getName())){ //2.获取WebSocket客户端的订阅地址 WebSocketChannelEnumchannelEnum=WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if(channelEnum!=null){ //3.给WebSocket客户端发送消息 template.convertAndSendToUser(redisWebsocketMsg.getReceiver(),channelEnum.getSubscribeUrl(),redisWebsocketMsg.getContent()); } }else{ //给其他订阅了主题的节点发消息,因为本节点没有 redisService.convertAndSend(topicName,redisWebsocketMsg); } } }
可以发现上面代码利用了redis监听模型,也就是redis模型的消息队列
/** *@author:hao *@description:redis消息监听实现类,接收处理类 *@time:2019/7/314:00 */ @Component @Slf4j publicclassMessageReceiver{ @Autowired privateSimpMessagingTemplatemessagingTemplate; @Autowired privateSimpUserRegistryuserRegistry; /** *处理WebSocket消息 */ publicvoidreceiveMessage(RedisWebsocketMsgredisWebsocketMsg){ log.info(MessageFormat.format("ReceivedMessage:{0}",redisWebsocketMsg)); //1.取出用户名并判断是否连接到当前应用节点的WebSocket SimpUsersimpUser=userRegistry.getUser(redisWebsocketMsg.getReceiver()); if(simpUser!=null&&StringUtils.isNotBlank(simpUser.getName())){ //2.获取WebSocket客户端的订阅地址 WebSocketChannelEnumchannelEnum=WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if(channelEnum!=null){ //3.给WebSocket客户端发送消息 messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(),channelEnum.getSubscribeUrl(),redisWebsocketMsg.getContent()); } } } }
redis消息模型只贴部分代码就好了
/** *消息监听器 */ @Bean MessageListenerAdaptermessageListenerAdapter(MessageReceivermessageReceiver,Jackson2JsonRedisSerializer
上面的思路大体如下:客户端简历链接时候,传过来userid保存起来。发消息的时候通过userRegistry获取,能获取到就证明是跟本节点建立的链接,直接用本节点发消息就好了。
如果不是就利用redis消息队列,把消息推出去。每个节点去判断获取看下是不是本节点的userid。这样就实现了集群的部署。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。