业务需求 需要实现一个告警指定时间(N)内去重,并且定时(M)自动恢复的功能,即规定时间(N)内,不再发送同一个告警,并且当等待一段时间(M),若未再接收到告警,则触发消警(自动恢复)操作.
思路
使用rabbitmq的死信队列
使用redis,给key添加过期监听事件
使用ExpiringMap,带有过期时间的map
实践 rabbitmq
不符合业务需求,当一条同类型告警来临时,需要更新消息的失效时间,而mq进入队列的消息无法修改
redis 使用步骤
1.修改redis配置文件,notify-keyspace-events设置为Ex
notify-keyspace-events Ex K 键空间事件,以__keyspace@<db>__前缀发布。 E 键事件事件,以__keyevent@<db>__前缀发布。 g 通用命令(非类型特定),如DEL,EXPIRE,RENAME等等 $ 字符串命令 l 列表命令 s 集合命令 h 哈希命令 z 有序集合命令 x 过期事件(每次键到期时生成的事件) e 被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件) A g$lshzxe的别名,因此字符串AKE表示所有的事件。
2.程序测试
@Override public void run (String... args) throws Exception { SimpleDateFormat ft1 = new SimpleDateFormat ("yyyy-MM-dd hh:mm:ss" ); for (int i = 1 ; i < 50000 ; i++) { Calendar instance = Calendar.getInstance(); instance.add(Calendar.SECOND, i * 5 ); Date date = instance.getTime(); redisTemplate.opsForValue().set( i + "@" + ft1.format(date), "value" , i * 5 , TimeUnit.SECONDS); } System.out.println("初始化key" ); Thread.sleep(10000000 ); System.out.println("休眠结束" ); }
性能测试
./redis-benchmark -h 127.0.0.1 -p 6379 -c 20 -n 1000000 -t get -d 10 -P 8 -q
命令说明:
向127.0.0.1:6379这个Redis发送100万个请求,使用20个长连接发送,所有请求都是get命令,每个get命令的包体为10字节,使用8条Pipeline通道发送,并且只显示requests per second这一个结果。
参数说明:
-h 目标Redis服务网络地址
-p 目标Reids服务的端口
-c 客户端并发长连接数
-n 本次测试需要发起的请求数
-t 测试请求的方法
-d 测试请求的数据大小 字节
-P 开启Pipeline模式,并制定Pipeline通道数量
-q 只显示requests per second这一个结果
本地redis测试
连接数
字节长度
请求数
整体RPS
单连接RPS
1
10
1000000
52408.16
52408.16
1
59
1000000
53610.68
53610.68
1
100
1000000
53963.63
53963.63
1
1000
1000000
54451.40
54451.40
1
10000
1000000
54725.55
54725.55
5
59
1000000
67467.28
13493.45
10
59
1000000
70407.66
7040.76
50
59
1000000
60034.82
1200.69
远程redis测试
连接数
字节长度
请求数
整体RPS
单连接RPS
不开pipeline
1
10
1000000
12745.13
12745.13
1648.67
1
59
1000000
12521.28
12521.28
1695.92
1
100
1000000
12706.53
12706.53
1651.67
1
1000
1000000
12842.72
12842.72
1611.79
1
10000
1000000
13395.23
13395.23
1695.62
5
59
1000000
54580.76
10916.15
10
59
1000000
98570.72
9857.07
50
59
1000000
79027.84
1580.55
程序内存,CPU占用
总结
1.redis中存在10000左右过期key时,会导致监听有延迟出现,可能高达几分钟.
2.当不在同一台服务器上时,redis的时间需要与程序的时间同步.
3.redis中存入key的长度,在1000以下时,影响不大,存在跨服务器时,性能影响较大,可能与服务器性能,网络等多方面因素有关
ExpiringMap 开源地址
简介
1.可设置Map中的Entry在一段时间后自动过期。 2.可设置Map最大容纳值,当到达Maximum size后,再次插入值会导致Map中的第一个值过期。 3.可添加监听事件,在监听到Entry过期时调度监听函数。 4.可以设置懒加载,在调用get()方法时创建对象。
使用步骤
1.添加maven依赖
<dependency> <groupId>net.jodah</groupId> <artifactId>expiringmap</artifactId> <version>0.5 .8 </version> </dependency>
2.测试代码
@Test public void test9 () throws InterruptedException { ExpiringMap<String, String> map = ExpiringMap.builder() .maxSize(100 ) .variableExpiration() .expirationPolicy(ExpirationPolicy.ACCESSED) .asyncExpirationListener((ExpirationListener<String, String>) (key, value) -> System.out.println("key:" + key + " value:" + value + "\n失效时间:" + System.currentTimeMillis())) .build(); System.out.println("当前时间:" + System.currentTimeMillis()); map.put("test" , "test123" , 6 , TimeUnit.SECONDS); Thread.sleep(5000 ); System.err.println(map.get("test" )); Thread.sleep(10000 ); System.err.println(map.get("test" )); }
3.实测误差
性能测试
源码解析
1.构建
ExpiringMap<String, String> map = ExpiringMap.builder() .maxSize(100000 ) .expiration(1 , TimeUnit.SECONDS) .expirationPolicy(ExpirationPolicy.ACCESSED) .variableExpiration() .asyncExpirationListener(ExpiringMapTest::remindAsyncExpiration) .entryLoader(name -> "default" ) .build();
2.初始化
private ExpiringMap (final Builder<K, V> builder) { if (EXPIRER == null ) { synchronized (ExpiringMap.class) { if (EXPIRER == null ) { EXPIRER = Executors.newSingleThreadScheduledExecutor( THREAD_FACTORY == null ? new NamedThreadFactory ("ExpiringMap-Expirer" ) : THREAD_FACTORY); } } } if (LISTENER_SERVICE == null && builder.asyncExpirationListeners != null ) { synchronized (ExpiringMap.class) { if (LISTENER_SERVICE == null ) { LISTENER_SERVICE = (ThreadPoolExecutor) Executors.newCachedThreadPool( THREAD_FACTORY == null ? new NamedThreadFactory ("ExpiringMap-Listener-%s" ) : THREAD_FACTORY); } } } variableExpiration = builder.variableExpiration; entries = variableExpiration ? new EntryTreeHashMap <K, V>() : new EntryLinkedHashMap <K, V>(); if (builder.expirationListeners != null ) expirationListeners = new CopyOnWriteArrayList <ExpirationListener<K, V>>(builder.expirationListeners); if (builder.asyncExpirationListeners != null ) asyncExpirationListeners = new CopyOnWriteArrayList <ExpirationListener<K, V>>(builder.asyncExpirationListeners); expirationPolicy = new AtomicReference <ExpirationPolicy>(builder.expirationPolicy); expirationNanos = new AtomicLong (TimeUnit.NANOSECONDS.convert(builder.duration, builder.timeUnit)); maxSize = builder.maxSize; entryLoader = builder.entryLoader; expiringEntryLoader = builder.expiringEntryLoader; }
3.过期key存储,EntryMap,EntryLinkedHashMap,EntryTreeHashMap
private interface EntryMap <K, V> extends Map <K, ExpiringEntry<K, V>> { ExpiringEntry<K, V> first () ; void reorder (ExpiringEntry<K, V> entry) ; Iterator<ExpiringEntry<K, V>> valuesIterator () ; }
4.过期map实体ExpiringEntry,重写了compareTo方法,按过期时间从小到大排序
static class ExpiringEntry <K, V> implements Comparable <ExpiringEntry<K, V>> {final AtomicLong expirationNanos; final AtomicLong expectedExpiration; final AtomicReference<ExpirationPolicy> expirationPolicy; final K key; volatile Future<?> entryFuture; V value; volatile boolean scheduled; @Override public int compareTo (ExpiringEntry<K, V> other) { if (key.equals(other.key)) return 0 ; return expectedExpiration.get() < other.expectedExpiration.get() ? -1 : 1 ; } }
5.添加元素 ,过期key的实现逻辑
V putInternal (K key, V value, ExpirationPolicy expirationPolicy, long expirationNanos) { writeLock.lock(); try { ExpiringEntry<K, V> entry = entries.get(key); V oldValue = null ; if (entry == null ) { entry = new ExpiringEntry <K, V>(key, value, variableExpiration ? new AtomicReference <ExpirationPolicy>(expirationPolicy) : this .expirationPolicy, variableExpiration ? new AtomicLong (expirationNanos) : this .expirationNanos); if (entries.size() >= maxSize) { ExpiringEntry<K, V> expiredEntry = entries.first(); entries.remove(expiredEntry.key); notifyListeners(expiredEntry); } entries.put(key, entry); if (entries.size() == 1 || entries.first().equals(entry)) scheduleEntry(entry); } else { oldValue = entry.getValue(); if (!ExpirationPolicy.ACCESSED.equals(expirationPolicy) && ((oldValue == null && value == null ) || (oldValue != null && oldValue.equals(value)))) return value; entry.setValue(value); resetEntry(entry, false ); } return oldValue; } finally { writeLock.unlock(); } }
void scheduleEntry (ExpiringEntry<K, V> entry) { if (entry == null || entry.scheduled) return ; Runnable runnable = null ; synchronized (entry) { if (entry.scheduled) return ; final WeakReference<ExpiringEntry<K, V>> entryReference = new WeakReference <ExpiringEntry<K, V>>(entry); runnable = new Runnable () { @Override public void run () { ExpiringEntry<K, V> entry = entryReference.get(); writeLock.lock(); try { if (entry != null && entry.scheduled) { entries.remove(entry.key); notifyListeners(entry); } try { Iterator<ExpiringEntry<K, V>> iterator = entries.valuesIterator(); boolean schedulePending = true ; while (iterator.hasNext() && schedulePending) { ExpiringEntry<K, V> nextEntry = iterator.next(); if (nextEntry.expectedExpiration.get() <= System.nanoTime()) { iterator.remove(); notifyListeners(nextEntry); } else { scheduleEntry(nextEntry); schedulePending = false ; } } } catch (NoSuchElementException ignored) { } } finally { writeLock.unlock(); } } }; Future<?> entryFuture = EXPIRER.schedule(runnable, entry.expectedExpiration.get() - System.nanoTime(), TimeUnit.NANOSECONDS); entry.schedule(entryFuture); } }
总结
1.实测内存,CPU占用,以及失效监听触发延时都优于使用redis
2.符合业务需求
最后 每个需求都有不同的解决方案,不要吊死在一棵树上!