SpringBoot 集成 WebSocket

本文最后更新于:2020年12月30日 凌晨

场景 & 需求

  1. 客户端发送请求后,服务端进行处理后可以对所有的客户端进行 广播
  2. 服务端可以在任何时候主动对所有客户端进行 广播
  3. 客户端发送请求后,服务端进行处理后可以对指定客户端进行 点对点推送
  4. 服务端可以在任何时候主动对指定客户端进行 点对点推送
  5. 服务端可以在任何时候主动对指定某些客户端进行 广播
  6. 服务端可以识别客户端(状态),并以此进行 点对点推送

前置要求

本文假设你已经了解或知道以下技能,尤其而且是勾选的内容。

  • Java
  • Maven
  • SpringBoot

引入依赖

创建一个 SpringBoot 项目,并添加 spring-boot-starter-websocket 依赖

1
2
3
4
5
<!--spring boot web socket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置 SpringBoot WebSocket 支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 配置 SpringBoot WebSocket 支持
*
* @author rxliuli
*/
@Configuration
@EnableWebSocketMessageBroker
public class SpringWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
/**
* 注册一个 Socket 端点
*
* @param stompEndpointRegistry stomp 端点注册表
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry.addEndpoint("/endpoint")
//设置允许所有源请求(跨域)
.setAllowedOrigins("*")
.withSockJS();
}

/**
* 注册一些广播消息代理
*
* @param registry 消息代理注册对象
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//注册简单代理(里面是前缀)
//注:默认 topic 是主题(广播),user 则是用户(点对点)
registry.enableSimpleBroker("/topic", "/user");
}
}

双向广播服务端

客户端发送请求后,服务端进行处理后可以对所有的客户端进行 广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 双向广播控制器
*
* @author rxliuli
*/
@Controller
public class BilateralBroadcastingSocket {
/**
* 广播推送
* 注解 @Payload 是为了绑定消息到参数 text 上
*
* @param text 简单的文本信息
* @param sessionId 当前请求 socket 会话 id
* @return 会话 id 和消息内容
*/
@MessageMapping(value = "/talk")
@SendTo("/topic/broadcasting/bilateral/allClient")
public String talk(@Payload String text, @Header("simpSessionId") String sessionId) throws InterruptedException {
//模拟处理其他事情
Thread.sleep(1000L);
return "[ " + sessionId + "] 说: [" + text + "]";
}
}

双向广播客户端

向服务端发送消息,并监听服务端的广播。客户端发送消息与监听是分离的,也可以只向服务端发送消息而不监听广播,或者只接收广播不发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Document</title>
</head>

<body>
<script
type="application/javascript"
src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"
></script>
<script
type="application/javascript"
src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"
></script>
<script>
let socket = new SockJS('http://127.0.0.1:8080/endpoint')
stompClient = Stomp.over(socket)
stompClient.connect(
{},
// 连接成功回调函数
(frame) => {
console.log('服务端 Socket 连接建立')

// 获取 websocket 连接的 sessionId
const sessionId = /\/([^\/]+)\/websocket/.exec(
socket._transport.url,
)[1]
console.log('connected, session id: ' + sessionId)

// 订阅广播消息(双向通信)
// 这里是关键(订阅了服务端的 topic)
stompClient.subscribe(
'/topic/broadcasting/bilateral/allClient',
(res) => {
console.log(`[广播(双向通信)]: ${res.body}`)
},
)

// 发送请求
send()
},
(error) => {
console.log('Socket 连接失败')
},
)

function send() {
// 发送一个消息到服务端
// 发送消息到服务端
var headers = {}
var body = {
message: '消息内容',
}
stompClient.send('/talk', headers, JSON.stringify(body))
}

/**
* 监听窗口关闭事件,窗口关闭前,主动关闭连接,防止连接还没断开就关闭窗口,server 端会抛异常
*/
window.onbeforeunload = function () {
if (stompClient !== null) {
stompClient.disconnect()
socket.close()
}
console.log('断开连接')
}
</script>
</body>
</html>

单向广播服务端

从服务端推送消息到所有客户端,是单向推送到客户端的,不接受从客户端的输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 单向广播控制器
*
* @author rxliuli
*/
@Controller
public class UnidirectionalBroadcastingSocket {
/**
* 从服务端推送消息到所有客户端
* 这是单向推送到客户端的,不接受从客户端的输入
*/
@SendTo("/topic/broadcasting/unidirectional/allClient")
public Object broadcasting() {
return null;
}
}

/**
* 使用 Scheduled 不停的推送信息
*
* @author rxliuli
*/
@Component
public class ScheduledRefreshJob {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;

/**
* 不停地推送消息到客户端
*/
@Scheduled(fixedDelay = 10 * 1000)
public void scheduledBroadcasting() {
simpMessagingTemplate.convertAndSend("/topic/broadcasting/unidirectional/allClient", new Person(1L, "rx", false));
}
}

单向广播客户端

客户端只需要添加一个监听器就好了,不需要也不能向服务端发送消息。

1
2
3
4
5
6
7
// 订阅广播消息(服务端单向推送)
const subscription_broadcast = stompClient.subscribe(
'/topic/broadcasting/unidirectional/allClient',
(response) => {
console.log(`[广播(服务端单向推送)]: ${response.body}`)
},
)

点对点推送服务端

服务端使用 @SendToUser(path) 向单个客户端推送消息,这里的 @Header("simpSessionId") 指的是从客户端的请求头中的 simpSessionId 参数赋值给 sessionIdWebSocket 会话 ID,和用户 Session 不同,是每一个 WebSocket 唯一的 #即和用户不是一一对应)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 双向点对点推送控制器
*
* @author rxliuli
*/
@Controller
public class BilateralPushSocket {
/**
* 点对点推送(双向通信)
*
* @param text 消息
* @param sessionId 会话 id
* @return 推送到当前会话的消息
*/
@MessageMapping("/speak")
@SendToUser("/push/bilateral/thisClient")
public String speak(@Payload String text, @Header("simpSessionId") String sessionId) throws InterruptedException {
//模拟处理其他事情
Thread.sleep(1000L);
return "[ " + sessionId + "] send: [" + text + "]";
}
}

点对点推送客户端

客户端请求的路径需要注意一下,是以 /user/${sessionId} 开头,后面才是 @SendToUser(path) 中设置的 path

1
2
3
4
// 订阅私人消息(双向通信)
stompClient.subscribe(`/user/${sessionId}/push/bilateral/thisClient`, (res) => {
console.log(`[点对点推送(双向通信)]: ${res.body}`)
})

单向点对点推送服务端

其实和上面双向的点对点推送没什么太大的差别,就是只用 @SendToUser(path) 而不用 @MessageMapping(path) 了而已

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 单向点对点推送服务端
*
* @author rxliuli
*/
@Controller
public class UnidirectionalPushSocket {
/**
* 从服务端推送消息到所有客户端
* 这是单向推送到客户端的,不接受从客户端的输入
*/
@SendToUser("/push/unidirectional/thisClient")
public Object push() {
return null;
}
}

/**
* 使用 Scheduled 不停的推送信息
*
* @author rxliuli
*/
@Component
public class ScheduledRefreshJob {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;

/**
* 不停推送消息到某个指定的客户端
*/
@Scheduled(fixedDelay = 10 * 1000)
public void scheduledPush() {
simpMessagingTemplate.convertAndSendToUser("r2qspi4s", "/push/unidirectional/thisClient", new Person(2L, "琉璃", false));
}
}

单向点对点推送客户端

客户端和上面的双向点对点推送基本一致(完全一样好么?!)

1
2
3
4
5
6
7
// 订阅私人消息(单向通信)
stompClient.subscribe(
`/user/${sessionId}/push/unidirectional/thisClient`,
(res) => {
console.log(`[点对点推送(单向通信)]:${res.body}`)
},
)

记录 user -> Socket 会话对应的映射表

上面的点对点推送客户端几乎是没什么用处的(尤其而且是 单向点对点推送),因为每次创建的 Socket 连接都会变化,而没有与用户建立对应关系的话怎无法知道哪个用户对应的哪个人,也就不能发送消息给指定的用户(非 Socket Session Id)了

  1. 首先需要一个记录用户 Socket Session Id 的类,并注册为 SpringBoot 的组件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 用户 session 记录类
*
* @author rxliuli
*/
@Component
public class SocketSessionRegistry {
/**
* 未登录的用户默认存储的 user id
*/
public static final String DIRECT_TOURIST = "DIRECT_TOURIST";
/**
* 这个集合存储 用户 id -> session 列表
* 单个用户可能打开多个页面,就会出现多个 Socket 会话
*/
private final ConcurrentMap<String, Set<String>> userSessionIds = new ConcurrentHashMap<>();
private final Object lock = new Object();

/**
* 根据 user id 获取 sessionId
*
* @param user 用户 id
* @return 用户关联的 sessionId
*/
public Set<String> getSessionIds(String user) {
Set<String> set = this.userSessionIds.get(user);
return set != null ? set : Collections.emptySet();
}

/**
* 获取所有 session
*
* @return 所有的 用户 id -> session 列表
*/
public ConcurrentMap<String, Set<String>> getAllSessionIds() {
return this.userSessionIds;
}

/**
* 根据用户 id 注册一个 session
*
* @param user 用户 id
* @param sessionId Socket 会话 id
*/
public void registerSessionId(String user, String sessionId) {
Assert.notNull(user, "User must not be null");
Assert.notNull(sessionId, "Session ID must not be null");
synchronized (this.lock) {
Set<String> set = this.userSessionIds.get(user);
if (set == null) {
this.userSessionIds.put(user, new CopyOnWriteArraySet<>());
}
set.add(sessionId);
}
}

/**
* 根据用户 id 删除一个 session
*
* @param user 用户 id
* @param sessionId Socket 会话 id
*/
public void unregisterSessionId(String user, String sessionId) {
Assert.notNull(user, "User Name must not be null");
Assert.notNull(sessionId, "Session ID must not be null");
synchronized (this.lock) {
Set set = this.userSessionIds.get(user);
if (set != null && set.remove(sessionId) && set.isEmpty()) {
this.userSessionIds.remove(user);
}
}
}
}
  1. 监听 WebSocket 连接建立和关闭事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 会话事件监听基类
*
* @author rxliuli
*/
public abstract class BaseSessionEventListener<Event extends AbstractSubProtocolEvent> implements ApplicationListener<Event> {
protected final Logger log = LoggerFactory.getLogger(getClass());
@Autowired
protected SocketSessionRegistry webAgentSessionRegistry;

/**
* 计算出 user id 和 session id 并传入到自定义的函数中
*
* @param event 事件
* @param biConsumer 自定义的操作
*/
protected void using(Event event, BiConsumer<String, String> biConsumer) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
//login get from browser
List<String> shaNativeHeader = sha.getNativeHeader("Authorization");
String user;
if (shaNativeHeader == null || shaNativeHeader.isEmpty()) {
user = null;
} else {
user = shaNativeHeader.get(0);
}
//如果当前用户没有登录(没有认证信息),就添加到游客里面
if (user == null || "".equals(user) || "undefined".equals(user) || "null".equals(user)) {
user = SocketSessionRegistry.DIRECT_TOURIST;
}
String sessionId = sha.getSessionId();
biConsumer.accept(user, sessionId);
}
}

/**
* Socket 连接建立监听
* 用于 session 注册 以及 key 值获取
*
* @author rxliuli
*/
@Component
public class SessionConnectEventListener extends BaseSessionEventListener<SessionConnectEvent> {
@Override
public void onApplicationEvent(SessionConnectEvent event) {
using(event, (user, sessionId) -> webAgentSessionRegistry.registerSessionId(user, sessionId));
}
}

/**
* Socket 会话断开监听
*
* @author rxliuli
*/
@Component
public class SessionDisconnectEventListener extends BaseSessionEventListener<SessionDisconnectEvent> {
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
//这里先根据 session id 查询出 user,然后删除对应的会话 id
//前端无法传递 token 到这里却是只能出此下策了
using(event, (user, sessionId) -> webAgentSessionRegistry.getAllSessionIds().entrySet().stream()
.filter(sse -> sse.getValue().contains(sessionId))
.findFirst()
.ifPresent(sse -> {
webAgentSessionRegistry.unregisterSessionId(sse.getKey(), sessionId);
log.info("Socket 连接断开,用户:{},会话:{}", sse.getKey(), sessionId);
}));
}
}
  1. 客户端在打开和关闭连接的时候需要发送 user 给服务端

这里使用 headers 存放用户认证信息(唯一标识),所以在连接和关闭时要带上请求头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
stompClient.connect(getHeaders(), function(){
console.log('打开 Socket 连接')
})
// TODO 这里还有一些问题,无法带上 headers 到后端
stompClient.disconnect(function () {
console.log('断开连接');
}, getHeaders());

/**
* 获取一个认证的 headers 信息
* @return {{"X-Requested-With": string, Authorization: any}} 含有认证信息的 headers 对象
*/
function getHeaders() {
return {
'X-Requested-With': 'X-Requested-With',
'Authorization': localStorage.token
}
}
  1. 使用记录的 user -> session id 发送消息给指定的用户

下面是获取到所有已经登录的用户的 WebSocket 连接并发送一条消息

1
2
3
socketSessionRegistry.getAllSessionIds().entrySet().stream()
.filter(kv -> !SocketSessionRegistry.DIRECT_TOURIST.equals(kv.getKey()))
.forEach(kv -> kv.getValue().forEach(sessionId -> simpMessagingTemplate.convertAndSendToUser(sessionId, "/push/unidirectional/thisClient", new Person(2L, "琉璃", false))));

接受/返回复杂类型的消息(服务端)

其实看起来和刚才是没什么区别的,但 SpringBoot WebSocket 原本就对消息进行了解析/封装,所以我们不需要再去手动转换了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 接受和发送复杂类型的消息
*
* @author rxliuli
*/
@Controller
public class ComplexMessageSocket {
/**
* 接收/返回复杂类型 Person 的对象
*
* @param person Person 类对象
* @return Person 类对象
*/
@MessageMapping("/complexMessage")
@SendTo("/topic/complexMessage/allClient")
public Person complexMessage(Person person) {
return new Person().setName("Mr. " + person.getName());
}
}

发送/订阅复杂类型的消息(客户端)

客户端和之前的也差不多,需要注意的就是无论是发送/接受都需要将复杂类型的对象序列化为字符串(JavaScript 原生支持)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 订阅返回复杂类型的消息
stompClient.subscribe('/topic/complexMessage/allClient', (res) => {
console.log('订阅复杂类型类型的返回消息:{}', JSON.parse(res.body))
})

// 发送一个复杂类型的消息
stompClient.send(
'/complexMessage',
headers,
JSON.stringify({
id: 17,
name: 'rxliuli',
sex: false,
}),
)

WebSocket 客户端封装

每次这么一大堆的代码可以封装一下,吾辈也封装了一个 StopmClient 的客户端工具类,如果有什么不好的地方欢迎提出!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* websocket 连接的工具类
* 该工具类依赖于 sockjs-client 与 webstomp-client 两个类库
* 使用方法:
* 1. 设定 endpoint 属性
* 2. 添加连接成功 / 失败的回调函数进行连接
* 3. 订阅 / 发送消息
* 4. 断开连接
*/
const socketUtil = {
//最大重连次数
maxLen: 10,
//当前重连次数
currentLen: 0,
// 每次连接的时间间隔
timeInterval: 3000,
// 连接的 Socket 节点
endpoint: undefined,
// Socket 连接信息
stompClient: undefined,
socket: undefined,
/**
* Socket 连接的方法
*/
connectWebSocket(successFn, errorFn) {
this.socket = new SockJS(this.endpoint)
this.stompClient = Stomp.over(this.socket)
this.stompClient.connect(this.getHeaders(), successFn, (error) => {
if (this.currentLen++ < this.maxLen) {
console.log(`Socket 连接失败,将在 ${this.timeInterval / 1000}s 后重试`)
setTimeout(() => this.connectWebSocket(), 3000)
} else {
console.log('Socket 连接失败次数过多,将不再重试')
}
errorFn(error)
})
},
/**
* 断开连接的方法
*/
disconnectWebSocket() {
if (this.stompClient) {
this.stompClient.disconnect(function () {
console.log('断开连接')
}, this.getHeaders())
this.socket.close()
}
},
/**
* 获取当前 Socket 连接的 session id
*/
getSessionId() {
return /\/([^\/]+)\/websocket/.exec(this.socket._transport.url)[1]
},
/**
* 获取一个认证的 headers 信息
* 该方法可以被覆盖
* @return {{"X-Requested-With": string, Authorization: any}} 含有认证信息的 headers 对象
*/
getHeaders() {
return {
'X-Requested-With': 'X-Requested-With',
Authorization: localStorage.token,
}
},
/**
* 发送简单文本类型的消息
*/
sendText(url, body, headers = {}) {
return this.stompClient.send(url, headers, body)
},
/**
* 发送 json 类型的消息
*/
sendJSON(url, body, headers = {}) {
return this.stompClient.send(url, headers, JSON.stringify(body))
},
/**
* 订阅简单文本类型的消息
*/
subscribeText(url, successFn) {
return this.stompClient.subscribe(url, (res) => successFn(res))
},
/**
* 订阅 json 类型的消息
*/
subscribeJSON(url, successFn) {
return this.stompClient.subscribe(url, (res) =>
successFn(JSON.parse(res.body)),
)
},
/**
* 取消订阅
* @param obj 订阅对象
*/
unsubscribe(obj) {
if (obj && obj.unsubscribe) {
obj.unsubscribe()
}
},
}

SpringBoot 集成 WebSocket
https://blog.rxliuli.com/p/2edc3c5f92714553a1ccb1adb0758aa6/
作者
rxliuli
发布于
2020年4月18日
许可协议