服务指令专用websocket
This commit is contained in:
parent
523270820b
commit
636d7bc884
|
|
@ -31,7 +31,7 @@ public class WebSocketConfig {
|
||||||
FilterRegistrationBean bean = new FilterRegistrationBean();
|
FilterRegistrationBean bean = new FilterRegistrationBean();
|
||||||
bean.setFilter(websocketFilter());
|
bean.setFilter(websocketFilter());
|
||||||
//TODO 临时注释掉,测试下线上socket总断的问题
|
//TODO 临时注释掉,测试下线上socket总断的问题
|
||||||
bean.addUrlPatterns("/taskCountSocket/*", "/websocket/*","/eoaSocket/*","/eoaNewChatSocket/*", "/newsWebsocket/*", "/dragChannelSocket/*", "/vxeSocket/*");
|
bean.addUrlPatterns("/directive/websocket/*","/taskCountSocket/*", "/websocket/*","/eoaSocket/*","/eoaNewChatSocket/*", "/newsWebsocket/*", "/dragChannelSocket/*", "/vxeSocket/*");
|
||||||
return bean;
|
return bean;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,185 @@
|
||||||
|
package com.nu.websocket;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jeecg.common.base.BaseMap;
|
||||||
|
import org.jeecg.common.constant.WebsocketConst;
|
||||||
|
import org.jeecg.common.modules.redis.client.JeecgRedisClient;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.websocket.*;
|
||||||
|
import javax.websocket.server.PathParam;
|
||||||
|
import javax.websocket.server.ServerEndpoint;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author zmy
|
||||||
|
* @Date 2025-11-7 14:43:17
|
||||||
|
* @Description: 此注解相当于设置访问URL
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@ServerEndpoint("/directive/websocket/{userId}")
|
||||||
|
public class DirectiveWebSocket {
|
||||||
|
|
||||||
|
/**线程安全Map*/
|
||||||
|
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Redis触发监听名字
|
||||||
|
*/
|
||||||
|
public static final String REDIS_TOPIC_NAME = "socketHandler";
|
||||||
|
|
||||||
|
//避免初次调用出现空指针的情况
|
||||||
|
private static JeecgRedisClient jeecgRedisClient;
|
||||||
|
@Autowired
|
||||||
|
private void setJeecgRedisClient(JeecgRedisClient jeecgRedisClient){
|
||||||
|
DirectiveWebSocket.jeecgRedisClient = jeecgRedisClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//==========【websocket接受、推送消息等方法 —— 具体服务节点推送ws消息】========================================================================================
|
||||||
|
@OnOpen
|
||||||
|
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
|
||||||
|
try {
|
||||||
|
sessionPool.put(userId, session);
|
||||||
|
log.debug("【系统 DirectiveWebSocket】有新的连接,总数为:" + sessionPool.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnClose
|
||||||
|
public void onClose(@PathParam("userId") String userId) {
|
||||||
|
try {
|
||||||
|
sessionPool.remove(userId);
|
||||||
|
log.debug("【系统 DirectiveWebSocket】连接断开,总数为:" + sessionPool.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ws推送消息
|
||||||
|
*
|
||||||
|
* @param userId
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void pushMessage(String userId, String message) {
|
||||||
|
for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
|
||||||
|
//userId key值= {用户id + "_"+ 登录token的md5串}
|
||||||
|
//TODO vue2未改key新规则,暂时不影响逻辑
|
||||||
|
if (item.getKey().contains(userId)) {
|
||||||
|
Session session = item.getValue();
|
||||||
|
try {
|
||||||
|
//update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
|
||||||
|
synchronized (session){
|
||||||
|
log.debug("【系统 DirectiveWebSocket】推送单人消息:" + message);
|
||||||
|
session.getBasicRemote().sendText(message);
|
||||||
|
}
|
||||||
|
//update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error(e.getMessage(),e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ws遍历群发消息
|
||||||
|
*/
|
||||||
|
public void pushMessage(String message) {
|
||||||
|
try {
|
||||||
|
for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
|
||||||
|
try {
|
||||||
|
item.getValue().getAsyncRemote().sendText(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.debug("【系统 DirectiveWebSocket】群发消息:" + message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ws接受客户端消息
|
||||||
|
*/
|
||||||
|
@OnMessage
|
||||||
|
public void onMessage(String message, @PathParam(value = "userId") String userId) {
|
||||||
|
if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){
|
||||||
|
log.debug("【系统 DirectiveWebSocket】收到客户端消息:" + message);
|
||||||
|
}else{
|
||||||
|
log.debug("【系统 DirectiveWebSocket】收到客户端消息:" + message);
|
||||||
|
//update-begin---author:wangshuai---date:2024-05-07---for:【issues/1161】前端websocket因心跳导致监听不起作用---
|
||||||
|
this.sendMessage(userId, "ping");
|
||||||
|
//update-end---author:wangshuai---date:2024-05-07---for:【issues/1161】前端websocket因心跳导致监听不起作用---
|
||||||
|
}
|
||||||
|
|
||||||
|
// //------------------------------------------------------------------------------
|
||||||
|
// JSONObject obj = new JSONObject();
|
||||||
|
// //业务类型
|
||||||
|
// obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
|
||||||
|
// //消息内容
|
||||||
|
// obj.put(WebsocketConst.MSG_TXT, "心跳响应");
|
||||||
|
// this.pushMessage(userId, obj.toJSONString());
|
||||||
|
// //------------------------------------------------------------------------------
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 配置错误信息处理
|
||||||
|
*
|
||||||
|
* @param session
|
||||||
|
* @param t
|
||||||
|
*/
|
||||||
|
@OnError
|
||||||
|
public void onError(Session session, Throwable t) {
|
||||||
|
log.warn("【系统 DirectiveWebSocket】消息出现错误");
|
||||||
|
t.printStackTrace();
|
||||||
|
}
|
||||||
|
//==========【系统 WebSocket接受、推送消息等方法 —— 具体服务节点推送ws消息】========================================================================================
|
||||||
|
|
||||||
|
|
||||||
|
//==========【采用redis发布订阅模式——推送消息】========================================================================================
|
||||||
|
/**
|
||||||
|
* 后台发送消息到redis
|
||||||
|
*
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void sendMessage(String message) {
|
||||||
|
//log.debug("【系统 WebSocket】广播消息:" + message);
|
||||||
|
BaseMap baseMap = new BaseMap();
|
||||||
|
baseMap.put("userId", "");
|
||||||
|
baseMap.put("message", message);
|
||||||
|
jeecgRedisClient.sendMessage(DirectiveWebSocket.REDIS_TOPIC_NAME, baseMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 此为单点消息 redis
|
||||||
|
*
|
||||||
|
* @param userId
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void sendMessage(String userId, String message) {
|
||||||
|
BaseMap baseMap = new BaseMap();
|
||||||
|
baseMap.put("userId", userId);
|
||||||
|
baseMap.put("message", message);
|
||||||
|
jeecgRedisClient.sendMessage(DirectiveWebSocket.REDIS_TOPIC_NAME, baseMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 此为单点消息(多人) redis
|
||||||
|
*
|
||||||
|
* @param userIds
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void sendMessage(String[] userIds, String message) {
|
||||||
|
for (String userId : userIds) {
|
||||||
|
sendMessage(userId, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//=======【采用redis发布订阅模式——推送消息】==========================================================================================
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue