本文最后更新于:2020年12月30日 凌晨
场景 & 需求
客户端发送请求后,服务端进行处理后可以对所有的客户端进行 广播
服务端可以在任何时候主动对所有客户端进行 广播
客户端发送请求后,服务端进行处理后可以对指定客户端进行 点对点推送
服务端可以在任何时候主动对指定客户端进行 点对点推送
服务端可以在任何时候主动对指定某些客户端进行 广播
服务端可以识别客户端(状态),并以此进行 点对点推送
前置要求 本文假设你已经了解或知道以下技能,尤其而且是勾选的内容。
引入依赖 创建一个 SpringBoot
项目,并添加 spring-boot-starter-websocket
依赖
1 2 3 4 5 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-websocket</artifactId > </dependency >
配置 SpringBoot WebSocket
支持 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Configuration @EnableWebSocketMessageBroker public class SpringWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints (StompEndpointRegistry stompEndpointRegistry) { stompEndpointRegistry.addEndpoint("/endpoint" ) .setAllowedOrigins("*" ) .withSockJS(); } @Override public void configureMessageBroker (MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic" , "/user" ); } }
双向广播服务端 客户端发送请求后,服务端进行处理后可以对所有的客户端进行 广播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Controller public class BilateralBroadcastingSocket { @MessageMapping(value = "/talk") @SendTo("/topic/broadcasting/bilateral/allClient") public String talk (@Payload String text, @Header("simpSessionId") String sessionId) throws InterruptedException { Thread.sleep(1000L ); return "[ " + sessionId + "] 说: [" + text + "]" ; } }
双向广播客户端 向服务端发送消息,并监听服务端的广播。客户端发送消息与监听是分离的,也可以只向服务端发送消息而不监听广播,或者只接收广播不发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" /> <meta name ="viewport" content ="width=device-width, initial-scale=1.0" /> <meta http-equiv ="X-UA-Compatible" content ="ie=edge" /> <title > Document</title > </head > <body > <script type ="application/javascript" src ="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js" > </script > <script type ="application/javascript" src ="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js" > </script > <script > let socket = new SockJS ('http://127.0.0.1:8080/endpoint' ) stompClient = Stomp .over (socket) stompClient.connect ( {}, (frame ) => { console .log ('服务端 Socket 连接建立' ) const sessionId = /\/([^\/]+)\/websocket/ .exec ( socket._transport .url , )[1 ] console .log ('connected, session id: ' + sessionId) stompClient.subscribe ( '/topic/broadcasting/bilateral/allClient' , (res ) => { console .log (`[广播(双向通信)]: ${res.body} ` ) }, ) send () }, (error ) => { console .log ('Socket 连接失败' ) }, ) function send ( ) { var headers = {} var body = { message : '消息内容' , } stompClient.send ('/talk' , headers, JSON .stringify (body)) } window .onbeforeunload = function ( ) { if (stompClient !== null ) { stompClient.disconnect () socket.close () } console .log ('断开连接' ) } </script > </body > </html >
单向广播服务端 从服务端推送消息到所有客户端,是单向推送到客户端的,不接受从客户端的输入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Controller public class UnidirectionalBroadcastingSocket { @SendTo("/topic/broadcasting/unidirectional/allClient") public Object broadcasting () { return null ; } }@Component public class ScheduledRefreshJob { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Scheduled(fixedDelay = 10 * 1000) public void scheduledBroadcasting () { simpMessagingTemplate.convertAndSend("/topic/broadcasting/unidirectional/allClient" , new Person (1L , "rx" , false )); } }
单向广播客户端 客户端只需要添加一个监听器就好了,不需要也不能向服务端发送消息。
1 2 3 4 5 6 7 const subscription_broadcast = stompClient.subscribe ( '/topic/broadcasting/unidirectional/allClient' , (response ) => { console .log (`[广播(服务端单向推送)]: ${response.body} ` ) }, )
点对点推送服务端 服务端使用 @SendToUser(path)
向单个客户端推送消息,这里的 @Header("simpSessionId")
指的是从客户端的请求头中的 simpSessionId
参数赋值给 sessionId
(WebSocket
会话 ID
,和用户 Session
不同,是每一个 WebSocket
唯一的 #即和用户不是一一对应)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Controller public class BilateralPushSocket { @MessageMapping("/speak") @SendToUser("/push/bilateral/thisClient") public String speak (@Payload String text, @Header("simpSessionId") String sessionId) throws InterruptedException { Thread.sleep(1000L ); return "[ " + sessionId + "] send: [" + text + "]" ; } }
点对点推送客户端 客户端请求的路径需要注意一下,是以 /user/${sessionId}
开头,后面才是 @SendToUser(path)
中设置的 path
。
1 2 3 4 stompClient.subscribe (`/user/${sessionId} /push/bilateral/thisClient` , (res ) => { console .log (`[点对点推送(双向通信)]: ${res.body} ` ) })
单向点对点推送服务端 其实和上面双向的点对点推送没什么太大的差别,就是只用 @SendToUser(path)
而不用 @MessageMapping(path)
了而已
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Controller public class UnidirectionalPushSocket { @SendToUser("/push/unidirectional/thisClient") public Object push () { return null ; } }@Component public class ScheduledRefreshJob { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Scheduled(fixedDelay = 10 * 1000) public void scheduledPush () { simpMessagingTemplate.convertAndSendToUser("r2qspi4s" , "/push/unidirectional/thisClient" , new Person (2L , "琉璃" , false )); } }
单向点对点推送客户端 客户端和上面的双向点对点推送基本一致(完全一样好么?!)
1 2 3 4 5 6 7 stompClient.subscribe ( `/user/${sessionId} /push/unidirectional/thisClient` , (res ) => { console .log (`[点对点推送(单向通信)]:${res.body} ` ) }, )
记录 user -> Socket 会话对应的映射表 上面的点对点推送客户端几乎是没什么用处的(尤其而且是 单向点对点推送 ),因为每次创建的 Socket
连接都会变化,而没有与用户建立对应关系的话怎无法知道哪个用户对应的哪个人,也就不能发送消息给指定的用户(非 Socket Session Id
)了 。
首先需要一个记录用户 Socket Session Id
的类,并注册为 SpringBoot 的组件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Component public class SocketSessionRegistry { public static final String DIRECT_TOURIST = "DIRECT_TOURIST" ; private final ConcurrentMap<String, Set<String>> userSessionIds = new ConcurrentHashMap <>(); private final Object lock = new Object (); public Set<String> getSessionIds (String user) { Set<String> set = this .userSessionIds.get(user); return set != null ? set : Collections.emptySet(); } public ConcurrentMap<String, Set<String>> getAllSessionIds () { return this .userSessionIds; } public void registerSessionId (String user, String sessionId) { Assert.notNull(user, "User must not be null" ); Assert.notNull(sessionId, "Session ID must not be null" ); synchronized (this .lock) { Set<String> set = this .userSessionIds.get(user); if (set == null ) { this .userSessionIds.put(user, new CopyOnWriteArraySet <>()); } set.add(sessionId); } } public void unregisterSessionId (String user, String sessionId) { Assert.notNull(user, "User Name must not be null" ); Assert.notNull(sessionId, "Session ID must not be null" ); synchronized (this .lock) { Set set = this .userSessionIds.get(user); if (set != null && set.remove(sessionId) && set.isEmpty()) { this .userSessionIds.remove(user); } } } }
监听 WebSocket
连接建立和关闭事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public abstract class BaseSessionEventListener <Event extends AbstractSubProtocolEvent > implements ApplicationListener <Event> { protected final Logger log = LoggerFactory.getLogger(getClass()); @Autowired protected SocketSessionRegistry webAgentSessionRegistry; protected void using (Event event, BiConsumer<String, String> biConsumer) { StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage()); List<String> shaNativeHeader = sha.getNativeHeader("Authorization" ); String user; if (shaNativeHeader == null || shaNativeHeader.isEmpty()) { user = null ; } else { user = shaNativeHeader.get(0 ); } if (user == null || "" .equals(user) || "undefined" .equals(user) || "null" .equals(user)) { user = SocketSessionRegistry.DIRECT_TOURIST; } String sessionId = sha.getSessionId(); biConsumer.accept(user, sessionId); } }@Component public class SessionConnectEventListener extends BaseSessionEventListener <SessionConnectEvent> { @Override public void onApplicationEvent (SessionConnectEvent event) { using(event, (user, sessionId) -> webAgentSessionRegistry.registerSessionId(user, sessionId)); } }@Component public class SessionDisconnectEventListener extends BaseSessionEventListener <SessionDisconnectEvent> { @Override public void onApplicationEvent (SessionDisconnectEvent event) { using(event, (user, sessionId) -> webAgentSessionRegistry.getAllSessionIds().entrySet().stream() .filter(sse -> sse.getValue().contains(sessionId)) .findFirst() .ifPresent(sse -> { webAgentSessionRegistry.unregisterSessionId(sse.getKey(), sessionId); log.info("Socket 连接断开,用户:{},会话:{}" , sse.getKey(), sessionId); })); } }
客户端在打开和关闭连接的时候需要发送 user 给服务端
这里使用 headers
存放用户认证信息(唯一标识),所以在连接和关闭时要带上请求头
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 stompClient.connect(getHeaders(), function(){ console.log('打开 Socket 连接' ) }) stompClient.disconnect(function () { console.log('断开连接' ); }, getHeaders()); function getHeaders () { return { 'X-Requested-With' : 'X-Requested-With' , 'Authorization' : localStorage.token } }
使用记录的 user -> session id
发送消息给指定的用户
下面是获取到所有已经登录的用户的 WebSocket
连接并发送一条消息
1 2 3 socketSessionRegistry.getAllSessionIds().entrySet().stream() .filter(kv -> !SocketSessionRegistry.DIRECT_TOURIST.equals(kv.getKey())) .forEach(kv -> kv.getValue().forEach(sessionId -> simpMessagingTemplate.convertAndSendToUser(sessionId, "/push/unidirectional/thisClient" , new Person (2L , "琉璃" , false ))));
接受/返回复杂类型的消息(服务端) 其实看起来和刚才是没什么区别的,但 SpringBoot WebSocket
原本就对消息进行了解析/封装,所以我们不需要再去手动转换了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Controller public class ComplexMessageSocket { @MessageMapping("/complexMessage") @SendTo("/topic/complexMessage/allClient") public Person complexMessage (Person person) { return new Person ().setName("Mr. " + person.getName()); } }
发送/订阅复杂类型的消息(客户端) 客户端和之前的也差不多,需要注意的就是无论是发送/接受都需要将复杂类型的对象序列化为字符串(JavaScript
原生支持)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 stompClient.subscribe ('/topic/complexMessage/allClient' , (res ) => { console .log ('订阅复杂类型类型的返回消息:{}' , JSON .parse (res.body )) }) stompClient.send ( '/complexMessage' , headers, JSON .stringify ({ id : 17 , name : 'rxliuli' , sex : false , }), )
WebSocket 客户端封装 每次这么一大堆的代码可以封装一下,吾辈也封装了一个 StopmClient
的客户端工具类,如果有什么不好的地方欢迎提出!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 const socketUtil = { maxLen : 10 , currentLen : 0 , timeInterval : 3000 , endpoint : undefined , stompClient : undefined , socket : undefined , connectWebSocket (successFn, errorFn ) { this .socket = new SockJS (this .endpoint ) this .stompClient = Stomp .over (this .socket ) this .stompClient .connect (this .getHeaders (), successFn, (error ) => { if (this .currentLen ++ < this .maxLen ) { console .log (`Socket 连接失败,将在 ${this .timeInterval / 1000 } s 后重试` ) setTimeout (() => this .connectWebSocket (), 3000 ) } else { console .log ('Socket 连接失败次数过多,将不再重试' ) } errorFn (error) }) }, disconnectWebSocket ( ) { if (this .stompClient ) { this .stompClient .disconnect (function ( ) { console .log ('断开连接' ) }, this .getHeaders ()) this .socket .close () } }, getSessionId ( ) { return /\/([^\/]+)\/websocket/ .exec (this .socket ._transport .url )[1 ] }, getHeaders ( ) { return { 'X-Requested-With' : 'X-Requested-With' , Authorization : localStorage .token , } }, sendText (url, body, headers = {} ) { return this .stompClient .send (url, headers, body) }, sendJSON (url, body, headers = {} ) { return this .stompClient .send (url, headers, JSON .stringify (body)) }, subscribeText (url, successFn ) { return this .stompClient .subscribe (url, (res ) => successFn (res)) }, subscribeJSON (url, successFn ) { return this .stompClient .subscribe (url, (res ) => successFn (JSON .parse (res.body )), ) }, unsubscribe (obj ) { if (obj && obj.unsubscribe ) { obj.unsubscribe () } }, }