ChannelContextUtils.java
package com.zpx.chirp.websocket;

import com.zpx.chirp.entity.constants.Constants;
import com.zpx.chirp.entity.dto.MessageSendDto;
import com.zpx.chirp.entity.dto.WsInitData;
import com.zpx.chirp.entity.enums.MessageTypeEnum;
import com.zpx.chirp.entity.enums.UserContactApplyStatusEnum;
import com.zpx.chirp.entity.enums.UserContactTypeEnum;
import com.zpx.chirp.entity.po.ChatMessage;
import com.zpx.chirp.entity.po.ChatSessionUser;
import com.zpx.chirp.entity.po.UserInfo;
import com.zpx.chirp.entity.query.ChatMessageQuery;
import com.zpx.chirp.entity.query.UserContactApplyQuery;
import com.zpx.chirp.mapper.*;
import com.zpx.chirp.entity.query.ChatSessionUserQuery;
import com.zpx.chirp.redis.RedisComponent;
import com.zpx.chirp.service.ChatSessionUserService;
import com.zpx.chirp.utils.JsonUtils;
import com.zpx.chirp.utils.StringTools;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.chrono.IsoChronology;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * <p>
 * 描述: 把channel管理起来 ws连接起来要做的初始化工作 ws通道工具类
 * </p>
 *
 * @author Dorain
 * @version 1.0.0
 * @since 2025/3/3
 */

@Component
public class ChannelContextUtils {
    private static final Logger logger = LoggerFactory.getLogger(ChannelContextUtils.class);

    //定义一个全局的hashmap放到内存管理
    private static final ConcurrentHashMap<String, Channel> USER_CONTEXT_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, ChannelGroup> GROUP_CONTEXT_MAP = new ConcurrentHashMap<>();

    @Resource
    private RedisComponent redisComponent;

    @Resource
    private UserInfoMapper userInfoMapper;


    @Resource
    private ChatSessionUserMapper chatSessionUserMapper;

    @Resource
    private ChatMessageMapper chatMessageMapper;

    @Resource
    private UserContactApplyMapper userContactApplyMapper;


    public void addContext(String userId, Channel channel) {

        String channelId = channel.id().toString();
        logger.info("channelId:{}", channelId);
        AttributeKey attributeKey = null;
        if (!AttributeKey.exists(channelId)) {
            attributeKey = AttributeKey.newInstance(channelId);
        } else {
            attributeKey = AttributeKey.valueOf(channelId);
        }
        channel.attr(attributeKey).set(userId);//设置key 再设置id 通过channel获取UserId

        //拿到所有联系人
        List<String> contactIdList = redisComponent.getUserContactList(userId);
        for (String groupId : contactIdList) {
            if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) {
                add2Group(groupId, channel);
            }
        }

        USER_CONTEXT_MAP.put(userId, channel);
        redisComponent.saveUserHeartBeat(userId);

        //用户已经连上来了

        //更新用户最后连接时间
        UserInfo updateUserInfo = new UserInfo();
        updateUserInfo.setLastLoginTime(new Date());
        userInfoMapper.updateByUserId(updateUserInfo, userId);

        //给用户发送消息
        UserInfo userInfo = userInfoMapper.selectByUserId(userId);
        Long sourceLastOffTime = userInfo.getLastOffTime();
        Long lastOffTime = sourceLastOffTime;
        //查询最近三天的信息
        if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) {
            //离线时间超过3天以前的   最大也是查三天以前的
            lastOffTime = Constants.MILLISECOND_3DAYS_AGO;
        }
        //1.查询会话信息 查询所有的会话信息 保证换了设备会话会同步 优于微信
        ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery();
        sessionUserQuery.setUserId(userId);
        sessionUserQuery.setOrderBy("last_receive_time desc");
        List<ChatSessionUser> chatSessionUserList = chatSessionUserMapper.selectList1(sessionUserQuery);

        WsInitData wsInitData = new WsInitData();
        wsInitData.setChatSessionList(chatSessionUserList);
        //2.查询聊天记录 要查登陆用户相关的所有联系人的最近三天聊天记录 放在一个contactIdList集合里
        //这里就是因为我们只需要查是发给自己的信息所以contactIdList就是我们的id和我们加入的群组的id
        List<String> groupList = contactIdList.stream().filter(item -> item.startsWith(UserContactTypeEnum.GROUP.getPrefix())).collect(Collectors.toList());
        groupList.add(userId);
        ChatMessageQuery messageQuery = new ChatMessageQuery();
        messageQuery.setContactIdList(groupList);
        //最近三天的消息  太久了就取三天内的 不超过三天就取实际离线时间的消息记录
        messageQuery.setLastReceiveTime(lastOffTime);
        List<ChatMessage> chatMessageList = chatMessageMapper.selectList1(messageQuery);
        wsInitData.setChatMessageList(chatMessageList);
        //3.查询好友申请
        UserContactApplyQuery applyQuery = new UserContactApplyQuery();
        applyQuery.setReceiveUserId(userId);
        applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
        applyQuery.setLastApplyTimestamp(lastOffTime);  //最近三天离线时间段的申请  超过的不管
        Integer applyCount = userContactApplyMapper.selectCount1(applyQuery);
        wsInitData.setApplyCount(applyCount);


        //服务端发送消息 谁登陆发给谁
        MessageSendDto messageSendDto = new MessageSendDto();
        messageSendDto.setMessageType(MessageTypeEnum.INIT.getType());
        messageSendDto.setContactId(userId);  //发给当前登陆用户消息 最近三天的消息
        messageSendDto.setExtendData(wsInitData);
        sendMsg(messageSendDto, userId);
    }

    /* 也就是说一个channel就代表一个用户, 
    group就是channel的一个集合。 
    集合中有一个channel发送了消息,在集合中的每个channel都能收到消息*/
    private void add2Group(String groupId, Channel channel) {
        ChannelGroup group = GROUP_CONTEXT_MAP.get(groupId);
        if (group == null) {
            group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            GROUP_CONTEXT_MAP.put(groupId, group);
        }

        if (channel == null) {
            return;
        }
        group.add(channel);
    }


    public void removeContext(Channel channel) {
        //从channel中获取userId
        Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
        String userId = attribute.get();
        if (userId != null) {
            USER_CONTEXT_MAP.remove(userId);
        }

        redisComponent.removeUserHeartBeat(userId);
        //更新用户最后离线时间
        UserInfo updateUserInfo = new UserInfo();
        updateUserInfo.setLastOffTime(System.currentTimeMillis());
        userInfoMapper.updateByUserId(updateUserInfo, userId);
    }

    public void sendMessage(MessageSendDto messageSendDto) {
        //看看是单聊还是群聊
        UserContactTypeEnum contactTypeEnum = UserContactTypeEnum.getByPrefix(messageSendDto.getContactId());
        switch (contactTypeEnum) {
            case USER:
                send2User(messageSendDto);
                break;

            case GROUP:
                send2Group(messageSendDto);
                break;


        }
    }

    //单聊 发送给用户
    private void send2User(MessageSendDto messageSendDto) {
        String contactId = messageSendDto.getContactId();
        if (StringTools.isEmpty(contactId)) {
            return;
        }
        sendMsg(messageSendDto, contactId);
        //强制下线
        if (MessageTypeEnum.FORCE_OFF_LINE.getType().equals(messageSendDto.getMessageType())) {
            // 关闭通道
            closeContext(contactId);
        }


    }

    public void closeContext(String userId) {
        if (StringTools.isEmpty(userId)) {
            return;
        }
        redisComponent.cleanUserTokenByUserId(userId);
        Channel channel = USER_CONTEXT_MAP.get(userId);
        if (channel == null) {
            return;
        }
        channel.close();

    }


    //发送给群组  channel对象不一样 要俩个方法

    private void send2Group(MessageSendDto messageSendDto) {
        if (StringTools.isEmpty(messageSendDto.getContactId())) {
            return;
        }
        ChannelGroup channelGroup = GROUP_CONTEXT_MAP.get(messageSendDto.getContactId());
        if (channelGroup == null) {
            return;
        }
        //直接往组里发消息
        channelGroup.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto)));

        //移除群聊
        MessageTypeEnum messageTypeEnum=MessageTypeEnum.getByType(messageSendDto.getMessageType());
        if (MessageTypeEnum.LEAVE_GROUP==messageTypeEnum ||MessageTypeEnum.REMOVE_GROUP==messageTypeEnum){
            String userId =(String) messageSendDto.getExtendData();
            //从缓存中删除联系人
            redisComponent.removeUserContact(userId,messageSendDto.getContactId());
            Channel channel = USER_CONTEXT_MAP.get(userId);
            if (channel==null){
                return;
            }
            channelGroup.remove(channel);

        }
        //解散群聊的时候
       if (MessageTypeEnum.DISSOLUTION_GROUP==messageTypeEnum){
           GROUP_CONTEXT_MAP.remove(messageSendDto.getContactId());
           channelGroup.close();
       }
    }

    //发送消息
    public void sendMsg(MessageSendDto messageSendDto, String receiverId) {
        if (receiverId == null) {
            return;
        }
        Channel userChannel = USER_CONTEXT_MAP.get(receiverId); //之前把userId和channel放在map里
        if (userChannel == null) {
            return;
        }

        //好友申请的时候就不用这样处理 比如添加好友时对方同意了就会有一条消息 这条消息也要转发到自己的客户端
        if (MessageTypeEnum.ADD_FRIEND_SELF.getType().equals(messageSendDto.getMessageType())) {
            UserInfo userInfo = (UserInfo) messageSendDto.getExtendData();
            messageSendDto.setMessageType(MessageTypeEnum.ADD_FRIEND.getType());
            messageSendDto.setContactId(userInfo.getUserId());
            messageSendDto.setContactName(userInfo.getNickName());
            messageSendDto.setExtendData(null);


        } else {
            //相对于客户端而言 用户的联系人就是消息发送人 ,
            // 对于用户而言,联系人就是消息接收者,用户是消息发送人 所以这里转一下再发送,因为sendMsg方法是公用的
            messageSendDto.setContactId(messageSendDto.getSendUserId());
            messageSendDto.setContactName(messageSendDto.getSendUserNickName());
        }

        userChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto)));  //把对象转成json发送


    }


    public void addUser2Group(String userId, String groupId) {
        Channel channel = USER_CONTEXT_MAP.get(userId);
        add2Group(groupId, channel);
    }
}