业务需求
需要实现一个告警指定时间(N)内去重,并且定时(M)自动恢复的功能,即规定时间(N)内,不再发送同一个告警,并且当等待一段时间(M),若未再接收到告警,则触发消警(自动恢复)操作.
思路
- 使用rabbitmq的死信队列
- 使用redis,给key添加过期监听事件
- 使用ExpiringMap,带有过期时间的map
实践
rabbitmq
不符合业务需求,当一条同类型告警来临时,需要更新消息的失效时间,而mq进入队列的消息无法修改
redis
使用步骤
1.修改redis配置文件,notify-keyspace-events设置为Ex
notify-keyspace-events Ex
K 键空间事件,以__keyspace@<db>__前缀发布。
E 键事件事件,以__keyevent@<db>__前缀发布。
g 通用命令(非类型特定),如DEL,EXPIRE,RENAME等等
$ 字符串命令
l 列表命令
s 集合命令
h 哈希命令
z 有序集合命令
x 过期事件(每次键到期时生成的事件)
e 被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件)
A g$lshzxe的别名,因此字符串AKE表示所有的事件。
2.程序测试
@Override
public void run(String... args) throws Exception {
SimpleDateFormat ft1 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
for (int i = 1; i < 50000; i++) {
Calendar instance = Calendar.getInstance();
instance.add(Calendar.SECOND, i * 5);
Date date = instance.getTime();
redisTemplate.opsForValue().set( i + "@" + ft1.format(date), "value", i * 5, TimeUnit.SECONDS);
}
System.out.println("初始化key");
Thread.sleep(10000000);
System.out.println("休眠结束");
}
性能测试
./redis-benchmark -h 127.0.0.1 -p 6379 -c 20 -n 1000000 -t get -d 10 -P 8 -q
命令说明:
向127.0.0.1:6379这个Redis发送100万个请求,使用20个长连接发送,所有请求都是get命令,每个get命令的包体为10字节,使用8条Pipeline通道发送,并且只显示requests per second这一个结果。
参数说明:
-h 目标Redis服务网络地址
-p 目标Reids服务的端口
-c 客户端并发长连接数
-n 本次测试需要发起的请求数
-t 测试请求的方法
-d 测试请求的数据大小 字节
-P 开启Pipeline模式,并制定Pipeline通道数量
-q 只显示requests per second这一个结果
本地redis测试
连接数 | 字节长度 | 请求数 | 整体RPS | 单连接RPS |
---|---|---|---|---|
1 | 10 | 1000000 | 52408.16 | 52408.16 |
1 | 59 | 1000000 | 53610.68 | 53610.68 |
1 | 100 | 1000000 | 53963.63 | 53963.63 |
1 | 1000 | 1000000 | 54451.40 | 54451.40 |
1 | 10000 | 1000000 | 54725.55 | 54725.55 |
5 | 59 | 1000000 | 67467.28 | 13493.45 |
10 | 59 | 1000000 | 70407.66 | 7040.76 |
50 | 59 | 1000000 | 60034.82 | 1200.69 |
远程redis测试
连接数 | 字节长度 | 请求数 | 整体RPS | 单连接RPS | 不开pipeline |
---|---|---|---|---|---|
1 | 10 | 1000000 | 12745.13 | 12745.13 | 1648.67 |
1 | 59 | 1000000 | 12521.28 | 12521.28 | 1695.92 |
1 | 100 | 1000000 | 12706.53 | 12706.53 | 1651.67 |
1 | 1000 | 1000000 | 12842.72 | 12842.72 | 1611.79 |
1 | 10000 | 1000000 | 13395.23 | 13395.23 | 1695.62 |
5 | 59 | 1000000 | 54580.76 | 10916.15 | |
10 | 59 | 1000000 | 98570.72 | 9857.07 | |
50 | 59 | 1000000 | 79027.84 | 1580.55 |
程序内存,CPU占用
总结
1.redis中存在10000左右过期key时,会导致监听有延迟出现,可能高达几分钟.
2.当不在同一台服务器上时,redis的时间需要与程序的时间同步.
3.redis中存入key的长度,在1000以下时,影响不大,存在跨服务器时,性能影响较大,可能与服务器性能,网络等多方面因素有关
ExpiringMap
简介
1.可设置Map中的Entry在一段时间后自动过期。
2.可设置Map最大容纳值,当到达Maximum size后,再次插入值会导致Map中的第一个值过期。
3.可添加监听事件,在监听到Entry过期时调度监听函数。
4.可以设置懒加载,在调用get()方法时创建对象。
使用步骤
1.添加maven依赖
<dependency>
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
<version>0.5.8</version>
</dependency>
2.测试代码
// 测试监听时间有误差
@Test
public void test9() throws InterruptedException {
ExpiringMap<String, String> map = ExpiringMap.builder()
.maxSize(100)
.variableExpiration()
.expirationPolicy(ExpirationPolicy.ACCESSED)
// 一个同步,一个异步
.asyncExpirationListener((ExpirationListener<String, String>) (key, value) -> System.out.println("key:" + key + " value:" + value + "\n失效时间:" + System.currentTimeMillis()))
.build();
System.out.println("当前时间:" + System.currentTimeMillis());
map.put("test", "test123", 6, TimeUnit.SECONDS);
Thread.sleep(5000);
System.err.println(map.get("test"));
Thread.sleep(10000);
System.err.println(map.get("test"));
}
3.实测误差
性能测试
源码解析
1.构建
/**
*
* maxSize:map的最大长度,添加第1001个entry时,会导致第1个马上过期(即使没到过期时间)
* expiration:过期时间和过期单位,设置过期时间,则永久有效.
* expirationPolicy:过期策略的使用
* CREATED: 在每次更新元素时,过期倒计时清零
* ACCESSED: 在每次访问元素时,过期倒计时清零
*
* variableExpiration:允许更新过期时间值,如果不设置variableExpiration
* 不允许更改过期时间,一旦执行更改过期时间的操作则会抛出UnsupportedOperationException异常
* expirationListener:同步过期监听
* asyncExpirationListener:异步过期监听
* entryLoader:懒加载,调用get方法时若key不存在创建默认value
*
*/
ExpiringMap<String, String> map = ExpiringMap.builder()
.maxSize(100000)
.expiration(1, TimeUnit.SECONDS)
.expirationPolicy(ExpirationPolicy.ACCESSED)
.variableExpiration()
.asyncExpirationListener(ExpiringMapTest::remindAsyncExpiration)
.entryLoader(name -> "default")
.build();
2.初始化
private ExpiringMap(final Builder<K, V> builder) {
if (EXPIRER == null) {
synchronized (ExpiringMap.class) {
if (EXPIRER == null) {
EXPIRER = Executors.newSingleThreadScheduledExecutor(
THREAD_FACTORY == null ? new NamedThreadFactory("ExpiringMap-Expirer") : THREAD_FACTORY);
}
}
}
if (LISTENER_SERVICE == null && builder.asyncExpirationListeners != null) {
synchronized (ExpiringMap.class) {
if (LISTENER_SERVICE == null) {
LISTENER_SERVICE = (ThreadPoolExecutor) Executors.newCachedThreadPool(
THREAD_FACTORY == null ? new NamedThreadFactory("ExpiringMap-Listener-%s") : THREAD_FACTORY);
}
}
}
variableExpiration = builder.variableExpiration;
entries = variableExpiration ? new EntryTreeHashMap<K, V>() : new EntryLinkedHashMap<K, V>();
if (builder.expirationListeners != null)
expirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.expirationListeners);
if (builder.asyncExpirationListeners != null)
asyncExpirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.asyncExpirationListeners);
expirationPolicy = new AtomicReference<ExpirationPolicy>(builder.expirationPolicy);
expirationNanos = new AtomicLong(TimeUnit.NANOSECONDS.convert(builder.duration, builder.timeUnit));
maxSize = builder.maxSize;
entryLoader = builder.entryLoader;
expiringEntryLoader = builder.expiringEntryLoader;
}
3.过期key存储,EntryMap,EntryLinkedHashMap,EntryTreeHashMap
private interface EntryMap<K, V> extends Map<K, ExpiringEntry<K, V>> {
/** Returns the first entry in the map or null if the map is empty. */
ExpiringEntry<K, V> first();
/**
* Reorders the given entry in the map.
*
* @param entry to reorder
*/
void reorder(ExpiringEntry<K, V> entry);
/** Returns a values iterator. */
Iterator<ExpiringEntry<K, V>> valuesIterator();
}
4.过期map实体ExpiringEntry,重写了compareTo方法,按过期时间从小到大排序
static class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
final AtomicLong expirationNanos;
/** Epoch time at which the entry is expected to expire */
final AtomicLong expectedExpiration;
final AtomicReference<ExpirationPolicy> expirationPolicy;
final K key;
/** Guarded by "this" */
volatile Future<?> entryFuture;
/** Guarded by "this" */
V value;
/** Guarded by "this" */
volatile boolean scheduled;
@Override
public int compareTo(ExpiringEntry<K, V> other) {
if (key.equals(other.key))
return 0;
return expectedExpiration.get() < other.expectedExpiration.get() ? -1 : 1;
}
}
5.添加元素,过期key的实现逻辑
/**
* Puts the given key/value in storage, scheduling the new entry for expiration if needed. If a previous value existed
* for the given key, it is first cancelled and the entries reordered to reflect the new expiration.
*/
V putInternal(K key, V value, ExpirationPolicy expirationPolicy, long expirationNanos) {
writeLock.lock();
try {
ExpiringEntry<K, V> entry = entries.get(key);
V oldValue = null;
if (entry == null) {
entry = new ExpiringEntry<K, V>(key, value,
variableExpiration ? new AtomicReference<ExpirationPolicy>(expirationPolicy) : this.expirationPolicy,
variableExpiration ? new AtomicLong(expirationNanos) : this.expirationNanos);
if (entries.size() >= maxSize) {
ExpiringEntry<K, V> expiredEntry = entries.first();
entries.remove(expiredEntry.key);
notifyListeners(expiredEntry);
}
entries.put(key, entry);
if (entries.size() == 1 || entries.first().equals(entry))
scheduleEntry(entry);
} else {
oldValue = entry.getValue();
if (!ExpirationPolicy.ACCESSED.equals(expirationPolicy)
&& ((oldValue == null && value == null) || (oldValue != null && oldValue.equals(value))))
return value;
entry.setValue(value);
resetEntry(entry, false);
}
return oldValue;
} finally {
writeLock.unlock();
}
}
/**
* Schedules an entry for expiration. Guards against concurrent schedule/schedule, cancel/schedule and schedule/cancel
* calls.
*
* @param entry Entry to schedule
*/
void scheduleEntry(ExpiringEntry<K, V> entry) {
if (entry == null || entry.scheduled)
return;
Runnable runnable = null;
synchronized (entry) {
if (entry.scheduled)
return;
final WeakReference<ExpiringEntry<K, V>> entryReference = new WeakReference<ExpiringEntry<K, V>>(entry);
runnable = new Runnable() {
@Override
public void run() {
ExpiringEntry<K, V> entry = entryReference.get();
writeLock.lock();
try {
if (entry != null && entry.scheduled) {
entries.remove(entry.key);
notifyListeners(entry);
}
try {
// Expires entries and schedules the next entry
Iterator<ExpiringEntry<K, V>> iterator = entries.valuesIterator();
boolean schedulePending = true;
while (iterator.hasNext() && schedulePending) {
ExpiringEntry<K, V> nextEntry = iterator.next();
if (nextEntry.expectedExpiration.get() <= System.nanoTime()) {
iterator.remove();
notifyListeners(nextEntry);
} else {
scheduleEntry(nextEntry);
schedulePending = false;
}
}
} catch (NoSuchElementException ignored) {
}
} finally {
writeLock.unlock();
}
}
};
// 添加延时任务
Future<?> entryFuture = EXPIRER.schedule(runnable, entry.expectedExpiration.get() - System.nanoTime(),
TimeUnit.NANOSECONDS);
entry.schedule(entryFuture);
}
}
总结
1.实测内存,CPU占用,以及失效监听触发延时都优于使用redis
2.符合业务需求
最后
每个需求都有不同的解决方案,不要吊死在一棵树上!