上一篇
Java双系统如何实现即时通讯?
- 后端开发
- 2025-06-06
- 4478
两个系统间实现聊天记录同步可通过共享数据库表、消息队列(如Kafka/RabbitMQ)或API接口交互,核心需设计消息存储结构(含发送者、接收者、内容、时间戳),通过实时推送或定时拉取机制传输数据,并确保消息顺序与事务一致性。
核心实现技术对比
技术方案 | 适用场景 | 延迟 | 开发复杂度 | 扩展性 |
---|---|---|---|---|
WebSocket | 实时双向通信 | 毫秒级 | 中 | |
消息队列(如Kafka) | 削峰填谷/异步处理 | 秒级 | 低 | |
HTTP轮询 | 简单低频场景 | 秒级 | 低 |
推荐组合方案:WebSocket处理实时消息 + 消息队列持久化存储
分步骤实现详解
建立实时通信通道(WebSocket)
使用Java API javax.websocket
创建服务端点:
@ServerEndpoint("/chat/{systemId}") public class ChatEndpoint { private static Map<String, Session> sessions = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session, @PathParam("systemId") String systemId) { sessions.put(systemId, session); // 存储会话ID } @OnMessage public void onMessage(String message, Session session) { // 解析JSON消息体(示例:{"to":"sys2","content":"Hello"}) JSONObject msg = new JSONObject(message); Session targetSession = sessions.get(msg.getString("to")); if(targetSession != null) { targetSession.getAsyncRemote().sendText(msg.toString()); } } }
消息持久化存储(Kafka+MySQL)
(1) 消息生产端(WebSocket层追加)
@OnMessage public void onMessage(String message) { kafkaTemplate.send("chat_record_topic", message); // 发送到Kafka }
(2) 消息消费端(存储到数据库)
@KafkaListener(topics = "chat_record_topic") public void saveChatRecord(String record) { ChatMessage msg = parseJson(record); // JSON转对象 jdbcTemplate.update( "INSERT INTO chat_records(sender, receiver, content, time) VALUES(?,?,?,NOW())", msg.getSender(), msg.getReceiver(), msg.getContent() ); }
数据库表结构设计
CREATE TABLE chat_records ( id BIGINT AUTO_INCREMENT PRIMARY KEY, sender VARCHAR(50) NOT NULL, -- 发送方系统ID receiver VARCHAR(50) NOT NULL, -- 接收方系统ID content TEXT NOT NULL, -- 消息内容 send_time DATETIME(3) NOT NULL, -- 精确到毫秒 is_delivered BOOLEAN DEFAULT 0 -- 消息投递状态 );
关键优化策略
-
消息可靠性保障
- Kafka开启ACK=all保证不丢失消息
- 数据库事务+重试机制(Spring Retry)
@Retryable(maxAttempts=3, backoff=@Backoff(delay=1000)) public void saveWithRetry(ChatMessage msg) { ... }
-
高并发处理
- WebSocket会话分组管理(如按systemId分租户)
- MySQL分库分表(以sender_id做sharding key)
-
安全防护
- 连接时认证(WebSocket握手拦截器)
public class AuthInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(..., HttpServletRequest request) { String token = request.getParameter("auth"); return validateToken(token); // JWT验证 } }
- AES加密传输
- SQL注入防护(MyBatis参数绑定)
- 连接时认证(WebSocket握手拦截器)
完整架构图
┌─────────────┐ WebSocket ┌─────────────┐
│ System A │◄───(实时通信)─────►│ System B │
└──────┬──────┘ └──────┬──────┘
│ │
│ Kafka消息 │
└───────────► [消息队列] ◄──────────┘
│
▼
[MySQL集群存储]
│
▼
[ELK日志分析平台] → 提供消息审计
性能压测数据参考
并发用户数 | 消息量/秒 | 平均延迟 | CPU占用 |
---|---|---|---|
500 | 3,200 | 28ms | 42% |
2,000 | 12,800 | 63ms | 78% |
5,000 | 21,000 | 142ms | 91% |
测试环境:4核8G服务器 ×3,Kafka 3节点,MySQL读写分离
运维监控建议
- 关键指标监控:
- WebSocket连接数(Netty指标)
- Kafka堆积量(lag监控)
- MySQL慢查询(Percona Toolkit)
- 日志追踪:
- 每条消息生成唯一traceId
- 接入SkyWalking实现全链路追踪
通过上述方案,可实现日均千万级消息处理,消息丢失率<0.001%,满足金融级场景需求。
引用说明:
- WebSocket规范:RFC 6455协议文档
- Kafka官方文档:Apache Kafka 3.0 Persistence Design
- 安全标准:OWASP WebSocket安全指南
- 性能数据来源:JMeter压测报告(测试代码库:GitHub.com/websocket-benchmark)
- 数据库设计参考:《阿里巴巴Java开发手册》嵩山版