实时交易数据监控系统的设计与实现

文章分为四个部分

  • 1、主要功能
  • 2、运用的技术
  • 3、系统设计
  • 4、优化与总结

1、主要功能

对平台支付网关的交易订单进行实时的统计,包括实时的交易金额与交易订单量、不同支付方式的交易总额、订单量以及占比、当天各个时间段的数据统计折线图,实现效果图如下:

untitled.jpg

2、运用的技术

  • Redis:利用Redis的消息发布与订阅功能、以及List、SortedSet、Hash的数据结构特性
  • WebSocket:负责将实时汇总的交易数据推送至浏览器客户端

3、系统设计

实时交易数据监控系统所涉及的工程包括交易服务、监控统计服务、监控应用(Dubbo服务化)。

  • 交易服务在交易成功后向Redis中发布消息并将数据发送至Redis的list队列
  • 监控服务负责Redis消息的订阅并进行统计,统计完成后将实时的统计结果再次发送至Redis
  • 监控应用作为WebSocket的服务端,也负责监听监控服务推送过来的实时统计数据并通过WebSocket将数据推送至客户端。

Redis数据结构图如下:
Redis数据结构.png

利用list的lpush、lpop功能进行对数据的存取操作,SortedSet最开始主要是用于排序,将交易时间作为score进行排序,但是因为涉及到一些数据的计算,在高并发以及分布式部署的情况下,利用SortedSet进行数据统计是会存在问题的,文末会提到,hash结构主要是用于对数据进行原子性的计算。

UML时序图如下:
Paste_Image.png

3.1、交易系统——支付服务

支付服务在交易成功后,会给Redis发布一条订单记录消息,并向Redis的list列表lpush一条同样的订单记录信息,为了不影响正常的支付业务流程,所以采用的是异步的方式,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Redis消息通道
*/
@Value("#{settings['redis.trade.channel']}")
private String redisChannel;
/**
* 微信支付的订单队列key
*/
@Value("#{settings['redis.trade.wxDetails']}")
private String redisWxQueue;
/**
* 支付宝支付的订单队列key
*/
@Value("#{settings['redis.trade.alipayDetails']}")
private String redisAlipayQueue;
/**
* 单个线程的线程池
*/
protected static ExecutorService executorService = Executors.newSingleThreadExecutor();

/**
* 交易成功后需要执行的业务逻辑
* @param paymentRecord
*/
public void successPayment(final PaymentRecord paymentRecord) {
// do otherthing...
// 异步发送消息
executorService.submit(new Runnable() {
@Override
public void run() {
try {
pushPaymentRecordMonitorVo(paymentRecord);
} catch (Exception e) {
log.error("payment send to redis fail,PaymengRecord:" + JsonUtil.toJsonString(paymentRecord));
}
}
});
}
/**
* 将交易成功的订单信息插入至Redis队列并发送一条通知通知
* @param paymentRecord
* @throws Exception
*/
private void pushPaymentRecordMonitorVo (PaymentRecord paymentRecord) throws Exception{
PaymentRecordMonitorVo paymentRecordMonitorVo = new PaymentRecordMonitorVo();
paymentRecordMonitorVo.setMerchantOrderNo(paymentRecord.getMerchantOrderNo());
paymentRecordMonitorVo.setPayWay(paymentRecord.getPayWayCode() == null ? null : paymentRecord.getPayWayCode().name());
paymentRecordMonitorVo.setTradeTime(paymentRecord.getPaySuccessTime());
paymentRecordMonitorVo.setAmount(paymentRecord.getOrderAmount());
log.info("订单消息插入Redis队列...");
if (paymentRecord.getPayWayCode() != null && paymentRecord.getPayWayCode().equals(PayWayEnum.WEIXIN)) {
JedisHelper.dataCluster().lpush(redisWxQueue,JsonUtil.toJsonString(paymentRecordMonitorVo));
} else if (paymentRecord.getPayWayCode() != null && paymentRecord.getPayWayCode().equals(PayWayEnum.ALIPAY)) {
JedisHelper.dataCluster().lpush(redisAlipayQueue,JsonUtil.toJsonString(paymentRecordMonitorVo));
}
log.info("订单消息插入Redis队列结束...");
// 发布消息
log.info("订单消息发布到Redis...");
JedisHelper.dataCluster().publish(redisChannel,JsonUtil.toJsonString(paymentRecordMonitorVo));
log.info("订单消息发布到Redis结束...");
}

3.2、监控服务
3.2.1.主要功能包括:
  • 订阅Redis中交易服务发布过来的订单消息以及获取list列表中的订单数据
  • 根据订单的交易时间,按照每15分钟为一个数据汇总点进行汇总
  • 对每15分钟汇总的SortedSet进行统计后,将结果再发布至Redis的消息中
3.2.2.遇到的坑:

在监控服务启动的时候会进行Redis的list列表中数据的统计初始化,并开启Redis消息订阅者的监听。但有三个比较坑的地方就是:

(1)因为用的是Redis6个节点组成的一个集群,所以是用JedisCluster,但是JedisCluster在2.8.x版本以上才支持消息的发布与订阅,项目原先用的是2.7.3版本
解决方案:把项目Jedis版本改为2.8.1,pom.xml内容如下:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>

(2)Redis的消息发布与订阅,在订阅方必须要手动的调用subscribe()方法,并将监听者和需要监听的通道作为参数传入才能开启监听,而像ActiveMQ这种消息中间件是不需要显示的调用,只需配置好消息监听者就会自动监听的。

还有一个坑就是subscribe()方法是一个线程阻塞方法,本想在项目启动的时候就调用subscribe()开启消息的订阅,结果发现方法调用后,其他的代码根本没法往下执行。
解决方案是:在项目启动的时候调用subscribe()方法开启消息监听,并且新开一个线程去调用subscribe()方法来避免阻塞主线程。

(3)Redis不支持消息的持久化。在订阅者没有启动的时候,消息发布者将消息发出去了,订阅者没有收到,那订阅者重新启动的时候也不会收到之前发的消息了,而像ActiveMQ是支持消息的持久化的。

解决方案:在往Redis发布消息的时候也同样往Redis的list列表中lpush一条同样消息的数据(参照上面交易服务中的代码),消息订阅者接收到消息并进行相应的业务处理后,再将list列表中的数据删除,那在监控服务挂掉的情况下,Redis消息无法正常被监听消费,但是Redis的list列表中还是会存有消息的数据,所以后续我们可以从list列表中取出消息数据再进行相应的业务处理,这样就间接的实现了Redis消息的持久化。

3.2.3.部分代码

(1)RedisSubscribeHelper.java:监控服务启动时,进行Redis队列中数据的统计初始化,并开启Redis消息订阅者的监听的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.ylp.core.monitor.redis;

import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.core.monitor.biz.MonitorBiz;
import com.ylp.facade.monitor.utils.JedisHelper;
import com.ylp.facade.monitor.utils.MonitorUtils;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author: 会跳舞的机器人
* @date: 16/8/25 下午2:24
* @description:作用1:项目启动时,对Redis消息未进行处理的订单数据进行初始化处理; 作用2:Redis消息订阅的启动器,项目启动时新启动一个线程进行消息的订阅
*/
@Component
public class RedisSubscribeHelper {
private Logger logger = Logger.getLogger(RedisSubscribeHelper.class);

@Autowired
private RedisSubscribeListener redisSubscribeListener;
@Autowired
private MonitorBiz monitorBiz;

/**
* Redis消息订阅的channel
*/
@Value("#{settings['redis.trade.channel']}")
private String redisChannel;

/**
* 微信支付的订单数据key
*/
@Value("#{settings['redis.trade.wxDetails']}")
private String redisWxQueue;

/**
* 支付宝支付的订单数据key
*/
@Value("#{settings['redis.trade.alipayDetails']}")
private String redisAlipayQueue;

/**
* redis消息订阅的subscribe方法为阻塞方法,所以需要单独启动一个线程进行消息订阅
*/
private ExecutorService executorService = Executors.newSingleThreadExecutor();


/**
* 构造函数执行后,执行数据初始化与消息订阅
*/
@PostConstruct
public void doWork() {
init();
subscribe();
}

/**
* 数据的初始化,不能一次性取出所有的数据,因为有可能系统在启动的时候Redis的list会有元素push进来
*/
private void init() {
long start = System.currentTimeMillis();
// 避免初始化加载时,JedisCluste还没完成初始化,所以需要sleep
JedisCluster jedisCluster = JedisHelper.dataCluster();
if (jedisCluster == null) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("开始从Redis中获取微信订单初始化数据....");
dealData(jedisCluster, redisWxQueue);
logger.info("微信订单记录数据全部处理完毕...");

logger.info("开始从Redis中获取支付宝订单初始化数据....");
dealData(jedisCluster, redisAlipayQueue);
logger.info("支付宝订单记录数据全部处理完毕...");

logger.info("初始化数据工作耗时:" + (System.currentTimeMillis() - start) + "ms");
}

/**
* 根据队列来处理队列中的数据
*
* @param queueName
*/
private void dealData(JedisCluster jedisCluster, String queueName) {
long startTime = MonitorUtils.getCurrentDayZeroTime().getTime();
long endTime = MonitorUtils.getDateEndTime().getTime();
Date tradeTime = null;
String str = "";
PaymentRecordMonitorVo paymentRecordMonitorVo = null;
while (true) {
try {
str = jedisCluster.lpop(queueName);
if (StringUtils.isEmpty(str)) {
break;
}
paymentRecordMonitorVo = JsonUtil.jsonToObject(str, PaymentRecordMonitorVo.class);
tradeTime = paymentRecordMonitorVo.getTradeTime();
// 只处理今天的数据
if (tradeTime != null && tradeTime.getTime() >= startTime && tradeTime.getTime() <= endTime) {
monitorBiz.periodStatistics(paymentRecordMonitorVo);
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}

/**
* Redis消息订阅
*/
private void subscribe() {
logger.info("启动Redis消息订阅,channel:" + redisChannel);
executorService.submit(new Runnable() {
@Override
public void run() {
JedisHelper.coreCluster().subscribe(redisSubscribeListener, redisChannel);
}
});
}

}

(2)RedisSubscribeListener.java:Redis消息监听者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.ylp.core.monitor.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.core.monitor.biz.MonitorBiz;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
import java.io.IOException;
/**
* @author: 会跳舞的机器人
* @date: 16/8/23 下午2:57
* @description: Redis消息队列监听者
*/
@Component("redisSubscribeListener")
public class RedisSubscribeListener extends JedisPubSub {
private Logger logger = Logger.getLogger(RedisSubscribeListener.class);
@Autowired
private MonitorBiz monitorBiz;
public RedisSubscribeListener() {
super();
}
@Override
public void onMessage(String channel, String message) {
logger.info("Redis received...");
logger.info("channel:" + channel + ",message:" + message);
if (StringUtils.isEmpty(message)) {
return;
}
PaymentRecordMonitorVo paymentRecordMonitorVo = null;
try {
paymentRecordMonitorVo = JsonUtil.jsonToObject(message, PaymentRecordMonitorVo.class);
} catch (IOException e) {
logger.error("message转PaymentRecordMonitorVo异常,message=" + message, e);
return;
}
// 先把该笔交易订单信息解析存入对应的统计时间段内,每隔15分钟汇总一次数据,存入Redis数据汇总集合
long start = System.currentTimeMillis();
monitorBiz.periodStatistics(paymentRecordMonitorVo);
logger.info("monitorBiz.periodStatistics()执行耗时:" + (System.currentTimeMillis() - start) + "ms");
// 再统计当天各个时间段的总数据,即为实时的交易数据
start = System.currentTimeMillis();
monitorBiz.publishRealTimeData();
logger.info("monitorBiz.publishRealTimeData()执行耗时:" + (System.currentTimeMillis() - start) + "ms");
super.onMessage(channel, message);
}

}

(3)MonitorBiz.java:Redis数据统计业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.ylp.core.monitor.biz;
import com.ylp.common.tools.utils.DateUtils;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.facade.monitor.utils.JedisHelper;
import com.ylp.facade.monitor.utils.MonitorUtils;
import com.ylp.facade.monitor.utils.NumberUtil;
import com.ylp.facade.monitor.vo.PaymentRecordMonitorVo;
import com.ylp.facade.monitor.vo.ScreenDataVo;
import com.ylp.facade.monitor.vo.StatisticsVo;
import com.ylp.facade.trade.enums.PayWayEnum;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.Tuple;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: 会跳舞的机器人
* @date: 16/8/25 下午6:12
* @description: Redis数据统计业务类
*/
@Component("monitorBiz")
public class MonitorBiz {
private Logger logger = Logger.getLogger(MonitorBiz.class);
/**
* 实时数据监控的数据队列
*/
@Value("#{settings['redis.trade.screen.channel']}")
private String redisTradeScreenChannel;
/**
* 微信支付的订单详细数据key
*/
@Value("#{settings['redis.trade.wxDetails']}")
private String redisWxQueue;
/**
* 支付宝支付的订单详细数据key
*/
@Value("#{settings['redis.trade.alipayDetails']}")
private String redisAlipayQueue;
/**
* 微信支付的订单汇总数据
*/
@Value("#{settings['redis.trade.wxSummary']}")
private String redisWxStatisticsQueue;
/**
* 支付宝支付的订单汇总数据
*/
@Value("#{settings['redis.trade.alipaySummary']}")
private String redisAlipayStatisticsQueue;
/**
* 单个线程的线程池
*/
private ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* 分时间间隔的数据统计,将时间段内的数据存入Redis有序集合
*
* @param paymentRecordMonitorVo 交易订单信息
* @return
*/
public void periodStatistics(final PaymentRecordMonitorVo paymentRecordMonitorVo) {
// 根据支付订单的交易时间获取该笔交易应该划分至哪个统计时间段内
Map<String, Date> dateMap = MonitorUtils.getTimeSpace(paymentRecordMonitorVo.getTradeTime());
Date start = dateMap.get("previous");
Date end = dateMap.get("next");
logger.info("start:" + DateUtils.dateToTime(start) + ",end:" + DateUtils.dateToTime(end));
String payWay = paymentRecordMonitorVo.getPayWay();
String queue = payWay.equals(PayWayEnum.WEIXIN.name()) ? redisWxStatisticsQueue : payWay.equals(PayWayEnum.ALIPAY.name()) ? redisAlipayStatisticsQueue : "";
StatisticsVo result = new StatisticsVo();
result.setPayWay(payWay);
result.setPeriodTime(end);
// 查询该时间段内,统计集合中的数据
Set<String> wxSet = JedisHelper.dataCluster().zrangeByScore(queue, end.getTime(), end.getTime());
// 如果汇总集合中没有该时间段的数据,那么则新增
if (CollectionUtils.isEmpty(wxSet)) {
logger.info("wxSet is null.... ");
result.setTotalOrderCount(1);
result.setTotalOrderAmount(paymentRecordMonitorVo.getAmount().doubleValue());
JedisHelper.dataCluster().zadd(queue, end.getTime(), JsonUtil.toJsonString(result));
} else {
logger.info("wxSet is not null and wxSet.size()=" + wxSet.size());
Double amount = 0.0;
Integer count = 0;
Iterator<String> iterator = wxSet.iterator();
while (iterator.hasNext()) {
try {
StatisticsVo statisticsVo = JsonUtil.jsonToObject(iterator.next(), StatisticsVo.class);
logger.info("statisticsVo:" + JsonUtil.toJsonString(statisticsVo));
amount = NumberUtil.add(statisticsVo.getTotalOrderAmount().doubleValue(), amount.doubleValue());
count = count + statisticsVo.getTotalOrderCount();
} catch (IOException e) {
e.printStackTrace();
}
}
result.setTotalOrderCount(count + 1);
result.setTotalOrderAmount(NumberUtil.add(amount, paymentRecordMonitorVo.getAmount().doubleValue()));
// 先移除旧数据
long removeRow = JedisHelper.dataCluster().zremrangeByScore(queue, end.getTime(), end.getTime());
logger.info("移除旧数据记录数:" + removeRow);
// 再插入最新的数据
JedisHelper.dataCluster().zadd(queue, end.getTime(), JsonUtil.toJsonString(result));
}
// 异步删除Redis队列中的订单详细信息
final String key = payWay.equals(PayWayEnum.WEIXIN.name()) ? redisWxQueue : payWay.equals(PayWayEnum.ALIPAY.name()) ? redisAlipayQueue : "";
executorService.submit(new Runnable() {
@Override
public void run() {
// 指定元素进行删除
Long row = JedisHelper.dataCluster().lrem(key, 0, JsonUtil.toJsonString(paymentRecordMonitorVo));
logger.info("删除明细记录数:" + row);
}
});
logger.info("result:" + JsonUtil.toJsonString(result));
}
/**
* 实时数据推送
*/
public void publishRealTimeData() {
ScreenDataVo screenDataVo = dayStatistics();
// 将屏幕监控的数据放入Redis消息中
JedisHelper.dataCluster().publish(redisTradeScreenChannel, JsonUtil.toJsonString(screenDataVo));
logger.info("监控实时汇总数据消息发送成功...");
}
}

3.3、监控应用系统

系统采用前后端分离的模式,后端提供获取初始化数据的API给前端进行调用,数据初始化完成后,每笔交易产生的数据变动都通过WebSocket的方式实时传输到浏览器客户端进行展示。

3.3.1.主要功能:
  • 实现WebSocket的服务端,将实时的交易数据推送至客户端
  • 作为Redis消息的订阅方,获取到消息数据后,将数据通过WebSocket的方式广播出去
    3.3.2.遇到的坑

(1)在本地测试的时候,WebSocket服务端与客户端能进行正常的通信,部署至服务器后,客户端却无法连接到服务端,后来发现本地IDEA运行的是Tomcat8,服务器上部署的是Tomcat7.x版本,于是把服务器上的Tomcat版本换成8后就能进行正常的通信了。
WebSocket所需要的jar包如下:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.9</version>
</dependency>

3.3.3.部分代码

(1)WebSocketServer.java:WebSocket服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.ylp.webSocket.server;
import org.apache.log4j.Logger;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: 会跳舞的机器人
* @date: 16/8/24 上午11:25
* @description:WebSocket服务端
*/
@ServerEndpoint("/tradeSocketServer")
public class WebSocketServer {
private Logger logger = Logger.getLogger(this.getClass().getName());
/**
* 客户端会话集合
*/
private static Map<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
/**
* 连接建立成功需要执行的方法
*
* @param session
*/
@OnOpen
public void onOpen(Session session) {
logger.info("onOpen,sessionId:" + session.getId());
sessionMap.put(session.getId(), session);
}
/**
* 收到客户端调用后需要执行的方法
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("received message:" + message);
broadcastAll(message);
}
/**
* 广播给所有客户端
*
* @param message
*/
public static void broadcastAll(String message) {
Set<Map.Entry<String, Session>> set = sessionMap.entrySet();
for (Map.Entry<String, Session> i : set) {
try {
i.getValue().getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 连接关闭时执行的方法
*
* @param session
* @param closeReason
*/
@OnClose
public void onClose(Session session, CloseReason closeReason) {
sessionMap.remove(session.getId());
logger.info(String.format("Session %s closed because of %s", session.getId(), closeReason));
}
/**
* 发生错误时执行的方法
*
* @param session
* @param throwable
*/
@OnError
public void error(Session session, Throwable throwable) {
sessionMap.remove(session.getId());
logger.error("异常", throwable);
}
}

(3)RedisSubscribeListener.java:Redis消息监听者,这个工程中又要实现一遍,只不过是收到消息后需要执行的业务逻辑不一样,同样的,RedisSubcribeHelper.java类也要重新实现一遍,后续可以优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.ylp.redis;
import com.ylp.common.tools.utils.JsonUtil;
import com.ylp.facade.monitor.vo.ScreenDataVo;
import com.ylp.webSocket.server.WebSocketServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
import java.io.IOException;
/**
* @author: 会跳舞的机器人
* @date: 16/8/23 下午2:57
* @description: Redis消息队列监听者
*/
@Component("redisSubscribeListener")
public class RedisSubscribeListener extends JedisPubSub {
private Logger logger = Logger.getLogger(RedisSubscribeListener.class);
public RedisSubscribeListener() {
super();
}
@Override
public void onMessage(String channel, String message) {
logger.info("Redis received...");
logger.info("channel:" + channel + ",message:" + message);
if (StringUtils.isEmpty(message)) {
return;
}
try {
ScreenDataVo screenDataVo = JsonUtil.jsonToObject(message, ScreenDataVo.class);
logger.info("screenDataVo:" + JsonUtil.toJsonString(screenDataVo));
} catch (IOException e) {
logger.error("message转ScreenDataVo异常,message=" + message, e);
e.printStackTrace();
}
// 将实时交易数据推送至WebSocketServer
WebSocketServer.broadcastAll(message);
super.onMessage(channel, message);
}
}

(4)WebSocket的客户端(模拟)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<%@ page language="java" contentType="text/html; charset=UTF-8"
pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Index</title>
<script type="text/javascript">
var ws = null;
function startWebSocket() {
if ('WebSocket' in window)
ws = new WebSocket("ws://172.xx.xx.xx/tradeSocketServer");
else if ('MozWebSocket' in window)
ws = new MozWebSocket("ws://172.xx.xx.xx/tradeSocketServer");
else
alert("not support");
ws.onmessage = function (evt) {
alert(evt.data);
};
ws.onclose = function (evt) {
alert("close");
};
ws.onopen = function (evt) {
alert("open");
};
}
function sendMsg() {
ws.send(document.getElementById('writeMsg').value);
}
</script>
</head>
<body onload="startWebSocket();">
<input type="text" id="writeMsg"></input>
<input type="button" value="send" onclick="sendMsg()"></input>
</body>
</html>

3.3.4.WebSocket服务端通过域名访问配置Nginx

1.WebSocket服务端如果要通过域名来访问,需要配置Nginx,附上Nginx相关的配置信息,需要特别配置map选项
Nginx版本:Tengine version: Tengine/2.1.1 (nginx/1.6.2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
server_name xxx.xxx.com;
location / {
root /usr/local/www/monitorsystem;
index index.html;
try_files $uri $uri/ /index.html =404;
}
location /monitor/ {
proxy_pass http://172.xx.xx.xx:0000/;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Credentials' 'true';
add_header 'Access-Control-Allow-Methods' 'POST, GET, OPTIONS,PUT,DELETE';
add_header 'Access-Control-Allow-Headers' '*,token';
proxy_set_header Host $http_host;
proxy_set_header Cookie $http_cookie;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 1200;
client_max_body_size 100m;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}

2.遇到的坑

在客户端不活动的情况下,默认60秒后会客户端会自动断开与服务端的连接,通过Nginx中的proxy_red_timeout参数可以配置,客户端也可以通过每隔一段时间去请求一次服务端,保持于服务端的连接。

4、总结与优化

1、在测试部门进行高并发测试的情况下,periodStatistics()统计方法会出现数据统计错乱的情况,所以针对这个方法要加上同步代码块

2、上述的代码在单台服务器部署的情况下,基本上没啥问题,但是如果部署在多台服务器上(分布式部署),那么很多问题就发生咯,比如说:
问题1:Redis消息发布与订阅,消息发布出去了,A服务器中的应用会收到消息然后进行统计,B服务器中的应用也会收到相同的消息然后进行计算,那计算出来的结果就多出一倍的数据了。

问题2:当日实时数据是统计Redis中“每15分钟汇总”数据的SortedSet,即一天被分为96条汇总数据,统计当日实时数据的时候就把96条数据进行累加即可。但是 但是在每一笔订单过来的时候,会先计算出此次统计的最新结果,再把旧的“每15分钟汇总”数据记录给删除,然后再插入此次计算的最新结果(这样做的原因是SortedSet没有更新的方法),代码请看MonitorBiz的periodStatistics()方法,这样就会存在一个问题:A服务器计算出最新统计结构,然后把旧数据删除了,还没插入最新统计数据的时候,B机器去获取当日实时数据,这时候就会缺少那条已删除还未插入的数据,造成数据的错误。

那针对上面两个问题的解决方案是:
问题1解决方案:Redis消息发送的时候只发布一条简单的消息通知即可,把交易订单的数据存入Redis的list队列中,如果多个服务器都收到了消息,便去list队列中获取最新的一条记录,取到则进行处理,取不到则直接返回,Redis的list队列是能保证只有一台机器能取到数据的。

问题2解决方案:最开始设计的时候决定用SortedSet这种数据结构是因为它能满足排序的要求,但是从上面的问题看来,它也只是能解决排序的问题而已(前端要求要按时间排好顺序在“获取初始化数据”的接口中将数据发给他),对于并发的读写数据的操作都不能很好的支持,并不能解决上面所说的问题,所以最后决定改用Hash这种数据结构,原因是因为它支持原子性的数据累加操作,在高并发的情况下,能有很好的支持,如hincrby、hincrbyfloat方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 1.每隔15分钟汇总一次数据,存入Redis数据汇总集合
* 2.当日实时数据进行累加操作
*
* @param message 交易订单类型
* @return
*/
public void periodStatistics(String message) {
String listStr = "";
if (StringUtils.isNotEmpty(message) && message.equals(PayWayEnum.WEIXIN.name())) {
listStr = JedisHelper.dataCluster().lpop(redisWxQueue);
} else if (StringUtils.isNotEmpty(message) && message.equals(PayWayEnum.ALIPAY.name())) {
listStr = JedisHelper.dataCluster().lpop(redisAlipayQueue);
}
// 为空则说明数据被其他服务器取走了,则不用进行处理
if (StringUtils.isEmpty(listStr)) {
return;
}
PaymentRecordMonitorVo paymentRecordMonitorVo = null;
try {
paymentRecordMonitorVo = JsonUtil.jsonToObject(listStr, PaymentRecordMonitorVo.class);
} catch (IOException e) {
e.printStackTrace();
return;
}
// 每日实时数据的hash结构增量统计
increaseBy(paymentRecordMonitorVo);
// 分时间段的汇总统计
periodStatistics(paymentRecordMonitorVo);
}
/**
* 每日实时数据的hash结构增量统计
*
* @param paymentRecordMonitorVo
*/
private void increaseBy(PaymentRecordMonitorVo paymentRecordMonitorVo) {
// step1:微信订单数据处理
if (paymentRecordMonitorVo.getPayWay().equals(PayWayEnum.ALIPAY.name())) {
JedisHelper.dataCluster().hincrBy(daySummaryKey, dayAlipayCount, 1);
JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayAlipayAmount, paymentRecordMonitorVo.getAmount().doubleValue());
// step2:支付宝订单数据处理
} else if (paymentRecordMonitorVo.getPayWay().equals(PayWayEnum.WEIXIN.name())) {
JedisHelper.dataCluster().hincrBy(daySummaryKey, dayWxCount, 1);
JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayWxAmount, paymentRecordMonitorVo.getAmount().doubleValue());
}
// step3:总的数据增加
JedisHelper.dataCluster().hincrBy(daySummaryKey, dayTotalCount, 1);
JedisHelper.dataCluster().hincrByFloat(daySummaryKey, dayTotalAmount, paymentRecordMonitorVo.getAmount().doubleValue());
}

3、由于系统在设计之初是满足实时交易数据显示的,所以在每一笔订单过来的时候都会进行数据统计,然后利用WebSocket进行实时数据推送到客户端,在进行压力测试的时候,每秒的并发达到上百个,于是浏览器客户端就会发生崩溃的现象,因为每秒都会向浏览器客户端推送上百次
解决方案:在服务端进行数据统计后,每隔2秒把最新的数据推送至浏览器客户端。

4、前期开发的时候为了更好的调试与问题的定位,所以写了很多logger.info()这种日志记录操作,在上生产环境的时候要把这些内容去掉或者日志级别改为debug,这种日志记录会设计到磁盘IO读写,在某些时候可能会成为系统性能的瓶颈。

注:上述的代码大部分都已经过优化,优化后的代码就不贴出来了。

如果您觉得有帮助到您,不妨考虑请作者喝杯咖啡鼓励一下。