package com.zpx.chirp.websocket; import com.zpx.chirp.entity.constants.Constants; import com.zpx.chirp.entity.dto.MessageSendDto; import com.zpx.chirp.entity.dto.WsInitData; import com.zpx.chirp.entity.enums.MessageTypeEnum; import com.zpx.chirp.entity.enums.UserContactApplyStatusEnum; import com.zpx.chirp.entity.enums.UserContactTypeEnum; import com.zpx.chirp.entity.po.ChatMessage; import com.zpx.chirp.entity.po.ChatSessionUser; import com.zpx.chirp.entity.po.UserInfo; import com.zpx.chirp.entity.query.ChatMessageQuery; import com.zpx.chirp.entity.query.UserContactApplyQuery; import com.zpx.chirp.mapper.*; import com.zpx.chirp.entity.query.ChatSessionUserQuery; import com.zpx.chirp.redis.RedisComponent; import com.zpx.chirp.service.ChatSessionUserService; import com.zpx.chirp.utils.JsonUtils; import com.zpx.chirp.utils.StringTools; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.chrono.IsoChronology; import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** * <p> * 描述: 把channel管理起来 ws连接起来要做的初始化工作 ws通道工具类 * </p> * * @author Dorain * @version 1.0.0 * @since 2025/3/3 */ @Component public class ChannelContextUtils { private static final Logger logger = LoggerFactory.getLogger(ChannelContextUtils.class); //定义一个全局的hashmap放到内存管理 private static final ConcurrentHashMap<String, Channel> USER_CONTEXT_MAP = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, ChannelGroup> GROUP_CONTEXT_MAP = new ConcurrentHashMap<>(); @Resource private RedisComponent redisComponent; @Resource private UserInfoMapper userInfoMapper; @Resource private ChatSessionUserMapper chatSessionUserMapper; @Resource private ChatMessageMapper chatMessageMapper; @Resource private UserContactApplyMapper userContactApplyMapper; public void addContext(String userId, Channel channel) { String channelId = channel.id().toString(); logger.info("channelId:{}", channelId); AttributeKey attributeKey = null; if (!AttributeKey.exists(channelId)) { attributeKey = AttributeKey.newInstance(channelId); } else { attributeKey = AttributeKey.valueOf(channelId); } channel.attr(attributeKey).set(userId);//设置key 再设置id 通过channel获取UserId //拿到所有联系人 List<String> contactIdList = redisComponent.getUserContactList(userId); for (String groupId : contactIdList) { if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) { add2Group(groupId, channel); } } USER_CONTEXT_MAP.put(userId, channel); redisComponent.saveUserHeartBeat(userId); //用户已经连上来了 //更新用户最后连接时间 UserInfo updateUserInfo = new UserInfo(); updateUserInfo.setLastLoginTime(new Date()); userInfoMapper.updateByUserId(updateUserInfo, userId); //给用户发送消息 UserInfo userInfo = userInfoMapper.selectByUserId(userId); Long sourceLastOffTime = userInfo.getLastOffTime(); Long lastOffTime = sourceLastOffTime; //查询最近三天的信息 if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) { //离线时间超过3天以前的 最大也是查三天以前的 lastOffTime = Constants.MILLISECOND_3DAYS_AGO; } //1.查询会话信息 查询所有的会话信息 保证换了设备会话会同步 优于微信 ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery(); sessionUserQuery.setUserId(userId); sessionUserQuery.setOrderBy("last_receive_time desc"); List<ChatSessionUser> chatSessionUserList = chatSessionUserMapper.selectList1(sessionUserQuery); WsInitData wsInitData = new WsInitData(); wsInitData.setChatSessionList(chatSessionUserList); //2.查询聊天记录 要查登陆用户相关的所有联系人的最近三天聊天记录 放在一个contactIdList集合里 //这里就是因为我们只需要查是发给自己的信息所以contactIdList就是我们的id和我们加入的群组的id List<String> groupList = contactIdList.stream().filter(item -> item.startsWith(UserContactTypeEnum.GROUP.getPrefix())).collect(Collectors.toList()); groupList.add(userId); ChatMessageQuery messageQuery = new ChatMessageQuery(); messageQuery.setContactIdList(groupList); //最近三天的消息 太久了就取三天内的 不超过三天就取实际离线时间的消息记录 messageQuery.setLastReceiveTime(lastOffTime); List<ChatMessage> chatMessageList = chatMessageMapper.selectList1(messageQuery); wsInitData.setChatMessageList(chatMessageList); //3.查询好友申请 UserContactApplyQuery applyQuery = new UserContactApplyQuery(); applyQuery.setReceiveUserId(userId); applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus()); applyQuery.setLastApplyTimestamp(lastOffTime); //最近三天离线时间段的申请 超过的不管 Integer applyCount = userContactApplyMapper.selectCount1(applyQuery); wsInitData.setApplyCount(applyCount); //服务端发送消息 谁登陆发给谁 MessageSendDto messageSendDto = new MessageSendDto(); messageSendDto.setMessageType(MessageTypeEnum.INIT.getType()); messageSendDto.setContactId(userId); //发给当前登陆用户消息 最近三天的消息 messageSendDto.setExtendData(wsInitData); sendMsg(messageSendDto, userId); } /* 也就是说一个channel就代表一个用户, group就是channel的一个集合。 集合中有一个channel发送了消息,在集合中的每个channel都能收到消息*/ private void add2Group(String groupId, Channel channel) { ChannelGroup group = GROUP_CONTEXT_MAP.get(groupId); if (group == null) { group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); GROUP_CONTEXT_MAP.put(groupId, group); } if (channel == null) { return; } group.add(channel); } public void removeContext(Channel channel) { //从channel中获取userId Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString())); String userId = attribute.get(); if (userId != null) { USER_CONTEXT_MAP.remove(userId); } redisComponent.removeUserHeartBeat(userId); //更新用户最后离线时间 UserInfo updateUserInfo = new UserInfo(); updateUserInfo.setLastOffTime(System.currentTimeMillis()); userInfoMapper.updateByUserId(updateUserInfo, userId); } public void sendMessage(MessageSendDto messageSendDto) { //看看是单聊还是群聊 UserContactTypeEnum contactTypeEnum = UserContactTypeEnum.getByPrefix(messageSendDto.getContactId()); switch (contactTypeEnum) { case USER: send2User(messageSendDto); break; case GROUP: send2Group(messageSendDto); break; } } //单聊 发送给用户 private void send2User(MessageSendDto messageSendDto) { String contactId = messageSendDto.getContactId(); if (StringTools.isEmpty(contactId)) { return; } sendMsg(messageSendDto, contactId); //强制下线 if (MessageTypeEnum.FORCE_OFF_LINE.getType().equals(messageSendDto.getMessageType())) { // 关闭通道 closeContext(contactId); } } public void closeContext(String userId) { if (StringTools.isEmpty(userId)) { return; } redisComponent.cleanUserTokenByUserId(userId); Channel channel = USER_CONTEXT_MAP.get(userId); if (channel == null) { return; } channel.close(); } //发送给群组 channel对象不一样 要俩个方法 private void send2Group(MessageSendDto messageSendDto) { if (StringTools.isEmpty(messageSendDto.getContactId())) { return; } ChannelGroup channelGroup = GROUP_CONTEXT_MAP.get(messageSendDto.getContactId()); if (channelGroup == null) { return; } //直接往组里发消息 channelGroup.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto))); //移除群聊 MessageTypeEnum messageTypeEnum=MessageTypeEnum.getByType(messageSendDto.getMessageType()); if (MessageTypeEnum.LEAVE_GROUP==messageTypeEnum ||MessageTypeEnum.REMOVE_GROUP==messageTypeEnum){ String userId =(String) messageSendDto.getExtendData(); //从缓存中删除联系人 redisComponent.removeUserContact(userId,messageSendDto.getContactId()); Channel channel = USER_CONTEXT_MAP.get(userId); if (channel==null){ return; } channelGroup.remove(channel); } //解散群聊的时候 if (MessageTypeEnum.DISSOLUTION_GROUP==messageTypeEnum){ GROUP_CONTEXT_MAP.remove(messageSendDto.getContactId()); channelGroup.close(); } } //发送消息 public void sendMsg(MessageSendDto messageSendDto, String receiverId) { if (receiverId == null) { return; } Channel userChannel = USER_CONTEXT_MAP.get(receiverId); //之前把userId和channel放在map里 if (userChannel == null) { return; } //好友申请的时候就不用这样处理 比如添加好友时对方同意了就会有一条消息 这条消息也要转发到自己的客户端 if (MessageTypeEnum.ADD_FRIEND_SELF.getType().equals(messageSendDto.getMessageType())) { UserInfo userInfo = (UserInfo) messageSendDto.getExtendData(); messageSendDto.setMessageType(MessageTypeEnum.ADD_FRIEND.getType()); messageSendDto.setContactId(userInfo.getUserId()); messageSendDto.setContactName(userInfo.getNickName()); messageSendDto.setExtendData(null); } else { //相对于客户端而言 用户的联系人就是消息发送人 , // 对于用户而言,联系人就是消息接收者,用户是消息发送人 所以这里转一下再发送,因为sendMsg方法是公用的 messageSendDto.setContactId(messageSendDto.getSendUserId()); messageSendDto.setContactName(messageSendDto.getSendUserNickName()); } userChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto))); //把对象转成json发送 } public void addUser2Group(String userId, String groupId) { Channel channel = USER_CONTEXT_MAP.get(userId); add2Group(groupId, channel); } }