业务需求

需要实现一个告警指定时间(N)内去重,并且定时(M)自动恢复的功能,即规定时间(N)内,不再发送同一个告警,并且当等待一段时间(M),若未再接收到告警,则触发消警(自动恢复)操作.

思路

  1. 使用rabbitmq的死信队列
  2. 使用redis,给key添加过期监听事件
  3. 使用ExpiringMap,带有过期时间的map

实践

rabbitmq

不符合业务需求,当一条同类型告警来临时,需要更新消息的失效时间,而mq进入队列的消息无法修改

redis

使用步骤

1.修改redis配置文件,notify-keyspace-events设置为Ex

notify-keyspace-events Ex

K 键空间事件,以__keyspace@<db>__前缀发布。
E 键事件事件,以__keyevent@<db>__前缀发布。
g 通用命令(非类型特定),如DEL,EXPIRE,RENAME等等
$ 字符串命令
l 列表命令
s 集合命令
h 哈希命令
z 有序集合命令
x 过期事件(每次键到期时生成的事件)
e 被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件)
A g$lshzxe的别名,因此字符串AKE表示所有的事件。

2.程序测试

@Override
public void run(String... args) throws Exception {
SimpleDateFormat ft1 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
for (int i = 1; i < 50000; i++) {
Calendar instance = Calendar.getInstance();
instance.add(Calendar.SECOND, i * 5);
Date date = instance.getTime();
redisTemplate.opsForValue().set( i + "@" + ft1.format(date), "value", i * 5, TimeUnit.SECONDS);
}

System.out.println("初始化key");
Thread.sleep(10000000);
System.out.println("休眠结束");
}

image-20220928202559954

性能测试

./redis-benchmark -h 127.0.0.1 -p 6379 -c 20 -n 1000000 -t get -d 10 -P 8 -q

命令说明:

​ 向127.0.0.1:6379这个Redis发送100万个请求,使用20个长连接发送,所有请求都是get命令,每个get命令的包体为10字节,使用8条Pipeline通道发送,并且只显示requests per second这一个结果。

参数说明:

-h 目标Redis服务网络地址

-p 目标Reids服务的端口

-c 客户端并发长连接数

-n 本次测试需要发起的请求数

-t 测试请求的方法

-d 测试请求的数据大小 字节

-P 开启Pipeline模式,并制定Pipeline通道数量

-q 只显示requests per second这一个结果

本地redis测试

连接数 字节长度 请求数 整体RPS 单连接RPS
1 10 1000000 52408.16 52408.16
1 59 1000000 53610.68 53610.68
1 100 1000000 53963.63 53963.63
1 1000 1000000 54451.40 54451.40
1 10000 1000000 54725.55 54725.55
5 59 1000000 67467.28 13493.45
10 59 1000000 70407.66 7040.76
50 59 1000000 60034.82 1200.69

远程redis测试

连接数 字节长度 请求数 整体RPS 单连接RPS 不开pipeline
1 10 1000000 12745.13 12745.13 1648.67
1 59 1000000 12521.28 12521.28 1695.92
1 100 1000000 12706.53 12706.53 1651.67
1 1000 1000000 12842.72 12842.72 1611.79
1 10000 1000000 13395.23 13395.23 1695.62
5 59 1000000 54580.76 10916.15
10 59 1000000 98570.72 9857.07
50 59 1000000 79027.84 1580.55

程序内存,CPU占用

image-20220928202610650

image-20220928202623004

总结

1.redis中存在10000左右过期key时,会导致监听有延迟出现,可能高达几分钟.

2.当不在同一台服务器上时,redis的时间需要与程序的时间同步.

3.redis中存入key的长度,在1000以下时,影响不大,存在跨服务器时,性能影响较大,可能与服务器性能,网络等多方面因素有关

ExpiringMap

开源地址

简介

1.可设置Map中的Entry在一段时间后自动过期。
2.可设置Map最大容纳值,当到达Maximum size后,再次插入值会导致Map中的第一个值过期。
3.可添加监听事件,在监听到Entry过期时调度监听函数。
4.可以设置懒加载,在调用get()方法时创建对象。

使用步骤

1.添加maven依赖

<dependency> 
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
<version>0.5.8</version>
</dependency>

2.测试代码

// 测试监听时间有误差  
@Test
public void test9() throws InterruptedException {
ExpiringMap<String, String> map = ExpiringMap.builder()
.maxSize(100)
.variableExpiration()
.expirationPolicy(ExpirationPolicy.ACCESSED)
// 一个同步,一个异步
.asyncExpirationListener((ExpirationListener<String, String>) (key, value) -> System.out.println("key:" + key + " value:" + value + "\n失效时间:" + System.currentTimeMillis()))
.build();
System.out.println("当前时间:" + System.currentTimeMillis());
map.put("test", "test123", 6, TimeUnit.SECONDS);
Thread.sleep(5000);
System.err.println(map.get("test"));
Thread.sleep(10000);
System.err.println(map.get("test"));
}

3.实测误差

image-20220928202632405

性能测试

image-20220928202641748

image-20220928202650512

源码解析

1.构建

/**
*
* maxSize:map的最大长度,添加第1001个entry时,会导致第1个马上过期(即使没到过期时间)
* expiration:过期时间和过期单位,设置过期时间,则永久有效.
* expirationPolicy:过期策略的使用
* CREATED: 在每次更新元素时,过期倒计时清零
* ACCESSED: 在每次访问元素时,过期倒计时清零
*
* variableExpiration:允许更新过期时间值,如果不设置variableExpiration
* 不允许更改过期时间,一旦执行更改过期时间的操作则会抛出UnsupportedOperationException异常
* expirationListener:同步过期监听
* asyncExpirationListener:异步过期监听
* entryLoader:懒加载,调用get方法时若key不存在创建默认value
*
*/
ExpiringMap<String, String> map = ExpiringMap.builder()
.maxSize(100000)
.expiration(1, TimeUnit.SECONDS)
.expirationPolicy(ExpirationPolicy.ACCESSED)
.variableExpiration()
.asyncExpirationListener(ExpiringMapTest::remindAsyncExpiration)
.entryLoader(name -> "default")
.build();

2.初始化

private ExpiringMap(final Builder<K, V> builder) {
if (EXPIRER == null) {
synchronized (ExpiringMap.class) {
if (EXPIRER == null) {
EXPIRER = Executors.newSingleThreadScheduledExecutor(
THREAD_FACTORY == null ? new NamedThreadFactory("ExpiringMap-Expirer") : THREAD_FACTORY);
}
}
}

if (LISTENER_SERVICE == null && builder.asyncExpirationListeners != null) {
synchronized (ExpiringMap.class) {
if (LISTENER_SERVICE == null) {
LISTENER_SERVICE = (ThreadPoolExecutor) Executors.newCachedThreadPool(
THREAD_FACTORY == null ? new NamedThreadFactory("ExpiringMap-Listener-%s") : THREAD_FACTORY);
}
}
}

variableExpiration = builder.variableExpiration;
entries = variableExpiration ? new EntryTreeHashMap<K, V>() : new EntryLinkedHashMap<K, V>();
if (builder.expirationListeners != null)
expirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.expirationListeners);
if (builder.asyncExpirationListeners != null)
asyncExpirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.asyncExpirationListeners);
expirationPolicy = new AtomicReference<ExpirationPolicy>(builder.expirationPolicy);
expirationNanos = new AtomicLong(TimeUnit.NANOSECONDS.convert(builder.duration, builder.timeUnit));
maxSize = builder.maxSize;
entryLoader = builder.entryLoader;
expiringEntryLoader = builder.expiringEntryLoader;
}

3.过期key存储,EntryMap,EntryLinkedHashMap,EntryTreeHashMap

private interface EntryMap<K, V> extends Map<K, ExpiringEntry<K, V>> {
/** Returns the first entry in the map or null if the map is empty. */
ExpiringEntry<K, V> first();

/**
* Reorders the given entry in the map.
*
* @param entry to reorder
*/
void reorder(ExpiringEntry<K, V> entry);

/** Returns a values iterator. */
Iterator<ExpiringEntry<K, V>> valuesIterator();
}

4.过期map实体ExpiringEntry,重写了compareTo方法,按过期时间从小到大排序

static class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
final AtomicLong expirationNanos;
/** Epoch time at which the entry is expected to expire */
final AtomicLong expectedExpiration;
final AtomicReference<ExpirationPolicy> expirationPolicy;
final K key;
/** Guarded by "this" */
volatile Future<?> entryFuture;
/** Guarded by "this" */
V value;
/** Guarded by "this" */
volatile boolean scheduled;


@Override
public int compareTo(ExpiringEntry<K, V> other) {
if (key.equals(other.key))
return 0;
return expectedExpiration.get() < other.expectedExpiration.get() ? -1 : 1;
}
}

5.添加元素,过期key的实现逻辑

/**
* Puts the given key/value in storage, scheduling the new entry for expiration if needed. If a previous value existed
* for the given key, it is first cancelled and the entries reordered to reflect the new expiration.
*/
V putInternal(K key, V value, ExpirationPolicy expirationPolicy, long expirationNanos) {
writeLock.lock();
try {
ExpiringEntry<K, V> entry = entries.get(key);
V oldValue = null;

if (entry == null) {
entry = new ExpiringEntry<K, V>(key, value,
variableExpiration ? new AtomicReference<ExpirationPolicy>(expirationPolicy) : this.expirationPolicy,
variableExpiration ? new AtomicLong(expirationNanos) : this.expirationNanos);
if (entries.size() >= maxSize) {
ExpiringEntry<K, V> expiredEntry = entries.first();
entries.remove(expiredEntry.key);
notifyListeners(expiredEntry);
}
entries.put(key, entry);
if (entries.size() == 1 || entries.first().equals(entry))
scheduleEntry(entry);
} else {
oldValue = entry.getValue();
if (!ExpirationPolicy.ACCESSED.equals(expirationPolicy)
&& ((oldValue == null && value == null) || (oldValue != null && oldValue.equals(value))))
return value;

entry.setValue(value);
resetEntry(entry, false);
}

return oldValue;
} finally {
writeLock.unlock();
}
}
 /**
* Schedules an entry for expiration. Guards against concurrent schedule/schedule, cancel/schedule and schedule/cancel
* calls.
*
* @param entry Entry to schedule
*/
void scheduleEntry(ExpiringEntry<K, V> entry) {
if (entry == null || entry.scheduled)
return;

Runnable runnable = null;
synchronized (entry) {
if (entry.scheduled)
return;

final WeakReference<ExpiringEntry<K, V>> entryReference = new WeakReference<ExpiringEntry<K, V>>(entry);
runnable = new Runnable() {
@Override
public void run() {
ExpiringEntry<K, V> entry = entryReference.get();

writeLock.lock();
try {
if (entry != null && entry.scheduled) {
entries.remove(entry.key);
notifyListeners(entry);
}

try {
// Expires entries and schedules the next entry
Iterator<ExpiringEntry<K, V>> iterator = entries.valuesIterator();
boolean schedulePending = true;

while (iterator.hasNext() && schedulePending) {
ExpiringEntry<K, V> nextEntry = iterator.next();
if (nextEntry.expectedExpiration.get() <= System.nanoTime()) {
iterator.remove();
notifyListeners(nextEntry);
} else {
scheduleEntry(nextEntry);
schedulePending = false;
}
}
} catch (NoSuchElementException ignored) {
}
} finally {
writeLock.unlock();
}
}
};
// 添加延时任务
Future<?> entryFuture = EXPIRER.schedule(runnable, entry.expectedExpiration.get() - System.nanoTime(),
TimeUnit.NANOSECONDS);
entry.schedule(entryFuture);
}
}

总结

1.实测内存,CPU占用,以及失效监听触发延时都优于使用redis

2.符合业务需求

最后

每个需求都有不同的解决方案,不要吊死在一棵树上!