一只会飞的旺旺
文章153
标签131
分类7
告警去重与延时恢复处理

告警去重与延时恢复处理

小卖铺上新啦!ChatGPT账号大甩卖! 一键直达

业务需求

需要实现一个告警指定时间(N)内去重,并且定时(M)自动恢复的功能,即规定时间(N)内,不再发送同一个告警,并且当等待一段时间(M),若未再接收到告警,则触发消警(自动恢复)操作.

思路

  1. 使用rabbitmq的死信队列
  2. 使用redis,给key添加过期监听事件
  3. 使用ExpiringMap,带有过期时间的map

实践

rabbitmq

不符合业务需求,当一条同类型告警来临时,需要更新消息的失效时间,而mq进入队列的消息无法修改

redis

使用步骤

1.修改redis配置文件,notify-keyspace-events设置为Ex

notify-keyspace-events Ex

K     键空间事件,以__keyspace@<db>__前缀发布。
E     键事件事件,以__keyevent@<db>__前缀发布。
g     通用命令(非类型特定),如DEL,EXPIRE,RENAME等等
$     字符串命令
l     列表命令
s     集合命令
h     哈希命令
z     有序集合命令
x     过期事件(每次键到期时生成的事件)
e     被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件)
A     g$lshzxe的别名,因此字符串AKE表示所有的事件。

2.程序测试

    @Override
    public void run(String... args) throws Exception {
        SimpleDateFormat ft1 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        for (int i = 1; i < 50000; i++) {
            Calendar instance = Calendar.getInstance();
            instance.add(Calendar.SECOND, i * 5);
            Date date = instance.getTime();
            redisTemplate.opsForValue().set( i + "@" + ft1.format(date), "value", i * 5, TimeUnit.SECONDS);
        }

        System.out.println("初始化key");
        Thread.sleep(10000000);
        System.out.println("休眠结束");
    }

image-20220928202559954

性能测试

./redis-benchmark -h 127.0.0.1 -p 6379 -c 20 -n 1000000 -t get -d 10 -P 8 -q

命令说明:

​ 向127.0.0.1:6379这个Redis发送100万个请求,使用20个长连接发送,所有请求都是get命令,每个get命令的包体为10字节,使用8条Pipeline通道发送,并且只显示requests per second这一个结果。

参数说明:

-h 目标Redis服务网络地址

-p 目标Reids服务的端口

-c 客户端并发长连接数

-n 本次测试需要发起的请求数

-t 测试请求的方法

-d 测试请求的数据大小 字节

-P 开启Pipeline模式,并制定Pipeline通道数量

-q 只显示requests per second这一个结果

本地redis测试

连接数 字节长度 请求数 整体RPS 单连接RPS
1 10 1000000 52408.16 52408.16
1 59 1000000 53610.68 53610.68
1 100 1000000 53963.63 53963.63
1 1000 1000000 54451.40 54451.40
1 10000 1000000 54725.55 54725.55
5 59 1000000 67467.28 13493.45
10 59 1000000 70407.66 7040.76
50 59 1000000 60034.82 1200.69

远程redis测试

连接数 字节长度 请求数 整体RPS 单连接RPS 不开pipeline
1 10 1000000 12745.13 12745.13 1648.67
1 59 1000000 12521.28 12521.28 1695.92
1 100 1000000 12706.53 12706.53 1651.67
1 1000 1000000 12842.72 12842.72 1611.79
1 10000 1000000 13395.23 13395.23 1695.62
5 59 1000000 54580.76 10916.15
10 59 1000000 98570.72 9857.07
50 59 1000000 79027.84 1580.55

程序内存,CPU占用

image-20220928202610650

image-20220928202623004

总结

1.redis中存在10000左右过期key时,会导致监听有延迟出现,可能高达几分钟.

2.当不在同一台服务器上时,redis的时间需要与程序的时间同步.

3.redis中存入key的长度,在1000以下时,影响不大,存在跨服务器时,性能影响较大,可能与服务器性能,网络等多方面因素有关

ExpiringMap

开源地址

简介

1.可设置Map中的Entry在一段时间后自动过期。
2.可设置Map最大容纳值,当到达Maximum size后,再次插入值会导致Map中的第一个值过期。
3.可添加监听事件,在监听到Entry过期时调度监听函数。
4.可以设置懒加载,在调用get()方法时创建对象。

使用步骤

1.添加maven依赖

<dependency> 
    <groupId>net.jodah</groupId> 
    <artifactId>expiringmap</artifactId> 
    <version>0.5.8</version> 
</dependency> 

2.测试代码

// 测试监听时间有误差  
@Test
    public void test9() throws InterruptedException {
        ExpiringMap<String, String> map = ExpiringMap.builder()
                .maxSize(100)
                .variableExpiration()
                .expirationPolicy(ExpirationPolicy.ACCESSED)
            // 一个同步,一个异步
                .asyncExpirationListener((ExpirationListener<String, String>) (key, value) -> System.out.println("key:" + key + " value:" + value + "\n失效时间:" + System.currentTimeMillis()))
                .build();
        System.out.println("当前时间:" + System.currentTimeMillis());
        map.put("test", "test123", 6, TimeUnit.SECONDS);
        Thread.sleep(5000);
        System.err.println(map.get("test"));
        Thread.sleep(10000);
        System.err.println(map.get("test"));
    }

3.实测误差

image-20220928202632405

性能测试

image-20220928202641748

image-20220928202650512

源码解析

1.构建

/**
*
* maxSize:map的最大长度,添加第1001个entry时,会导致第1个马上过期(即使没到过期时间)
* expiration:过期时间和过期单位,设置过期时间,则永久有效.
* expirationPolicy:过期策略的使用
*      CREATED:  在每次更新元素时,过期倒计时清零
*      ACCESSED: 在每次访问元素时,过期倒计时清零
*
* variableExpiration:允许更新过期时间值,如果不设置variableExpiration
*      不允许更改过期时间,一旦执行更改过期时间的操作则会抛出UnsupportedOperationException异常
* expirationListener:同步过期监听
* asyncExpirationListener:异步过期监听
* entryLoader:懒加载,调用get方法时若key不存在创建默认value
*
*/
ExpiringMap<String, String> map = ExpiringMap.builder()
    .maxSize(100000)
    .expiration(1, TimeUnit.SECONDS)
    .expirationPolicy(ExpirationPolicy.ACCESSED)
    .variableExpiration()
    .asyncExpirationListener(ExpiringMapTest::remindAsyncExpiration)
    .entryLoader(name -> "default")
    .build();

2.初始化

  private ExpiringMap(final Builder<K, V> builder) {
    if (EXPIRER == null) {
      synchronized (ExpiringMap.class) {
        if (EXPIRER == null) {
          EXPIRER = Executors.newSingleThreadScheduledExecutor(
              THREAD_FACTORY == null ? new NamedThreadFactory("ExpiringMap-Expirer") : THREAD_FACTORY);
        }
      }
    }

    if (LISTENER_SERVICE == null && builder.asyncExpirationListeners != null) {
      synchronized (ExpiringMap.class) {
        if (LISTENER_SERVICE == null) {
          LISTENER_SERVICE = (ThreadPoolExecutor) Executors.newCachedThreadPool(
              THREAD_FACTORY == null ? new NamedThreadFactory("ExpiringMap-Listener-%s") : THREAD_FACTORY);
        }
      }
    }

    variableExpiration = builder.variableExpiration;
    entries = variableExpiration ? new EntryTreeHashMap<K, V>() : new EntryLinkedHashMap<K, V>();
    if (builder.expirationListeners != null)
      expirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.expirationListeners);
    if (builder.asyncExpirationListeners != null)
      asyncExpirationListeners = new CopyOnWriteArrayList<ExpirationListener<K, V>>(builder.asyncExpirationListeners);
    expirationPolicy = new AtomicReference<ExpirationPolicy>(builder.expirationPolicy);
    expirationNanos = new AtomicLong(TimeUnit.NANOSECONDS.convert(builder.duration, builder.timeUnit));
    maxSize = builder.maxSize;
    entryLoader = builder.entryLoader;
    expiringEntryLoader = builder.expiringEntryLoader;
  }

3.过期key存储,EntryMap,EntryLinkedHashMap,EntryTreeHashMap

 private interface EntryMap<K, V> extends Map<K, ExpiringEntry<K, V>> {
    /** Returns the first entry in the map or null if the map is empty. */
    ExpiringEntry<K, V> first();

    /**
     * Reorders the given entry in the map.
     * 
     * @param entry to reorder
     */
    void reorder(ExpiringEntry<K, V> entry);

    /** Returns a values iterator. */
    Iterator<ExpiringEntry<K, V>> valuesIterator();
  }

4.过期map实体ExpiringEntry,重写了compareTo方法,按过期时间从小到大排序

 static class ExpiringEntry<K, V> implements Comparable<ExpiringEntry<K, V>> {
    final AtomicLong expirationNanos;
    /** Epoch time at which the entry is expected to expire */
    final AtomicLong expectedExpiration;
    final AtomicReference<ExpirationPolicy> expirationPolicy;
    final K key;
    /** Guarded by "this" */
    volatile Future<?> entryFuture;
    /** Guarded by "this" */
    V value;
    /** Guarded by "this" */
    volatile boolean scheduled;
 
     
     @Override
    public int compareTo(ExpiringEntry<K, V> other) {
      if (key.equals(other.key))
        return 0;
      return expectedExpiration.get() < other.expectedExpiration.get() ? -1 : 1;
    }
 }

5.添加元素,过期key的实现逻辑

  /**
   * Puts the given key/value in storage, scheduling the new entry for expiration if needed. If a previous value existed
   * for the given key, it is first cancelled and the entries reordered to reflect the new expiration.
   */
  V putInternal(K key, V value, ExpirationPolicy expirationPolicy, long expirationNanos) {
    writeLock.lock();
    try {
      ExpiringEntry<K, V> entry = entries.get(key);
      V oldValue = null;

      if (entry == null) {
        entry = new ExpiringEntry<K, V>(key, value,
            variableExpiration ? new AtomicReference<ExpirationPolicy>(expirationPolicy) : this.expirationPolicy,
            variableExpiration ? new AtomicLong(expirationNanos) : this.expirationNanos);
        if (entries.size() >= maxSize) {
          ExpiringEntry<K, V> expiredEntry = entries.first();
          entries.remove(expiredEntry.key);
          notifyListeners(expiredEntry);
        }
        entries.put(key, entry);
        if (entries.size() == 1 || entries.first().equals(entry))
          scheduleEntry(entry);
      } else {
        oldValue = entry.getValue();
        if (!ExpirationPolicy.ACCESSED.equals(expirationPolicy)
            && ((oldValue == null && value == null) || (oldValue != null && oldValue.equals(value))))
          return value;

        entry.setValue(value);
        resetEntry(entry, false);
      }

      return oldValue;
    } finally {
      writeLock.unlock();
    }
  }
  /**
   * Schedules an entry for expiration. Guards against concurrent schedule/schedule, cancel/schedule and schedule/cancel
   * calls.
   * 
   * @param entry Entry to schedule
   */
  void scheduleEntry(ExpiringEntry<K, V> entry) {
    if (entry == null || entry.scheduled)
      return;

    Runnable runnable = null;
    synchronized (entry) {
      if (entry.scheduled)
        return;

      final WeakReference<ExpiringEntry<K, V>> entryReference = new WeakReference<ExpiringEntry<K, V>>(entry);
      runnable = new Runnable() {
        @Override
        public void run() {
          ExpiringEntry<K, V> entry = entryReference.get();

          writeLock.lock();
          try {
            if (entry != null && entry.scheduled) {
              entries.remove(entry.key);
              notifyListeners(entry);
            }

            try {
              // Expires entries and schedules the next entry
              Iterator<ExpiringEntry<K, V>> iterator = entries.valuesIterator();
              boolean schedulePending = true;

              while (iterator.hasNext() && schedulePending) {
                ExpiringEntry<K, V> nextEntry = iterator.next();
                if (nextEntry.expectedExpiration.get() <= System.nanoTime()) {
                  iterator.remove();
                  notifyListeners(nextEntry);
                } else {
                  scheduleEntry(nextEntry);
                  schedulePending = false;
                }
              }
            } catch (NoSuchElementException ignored) {
            }
          } finally {
            writeLock.unlock();
          }
        }
      };
    // 添加延时任务
      Future<?> entryFuture = EXPIRER.schedule(runnable, entry.expectedExpiration.get() - System.nanoTime(),
          TimeUnit.NANOSECONDS);
      entry.schedule(entryFuture);
    }
  }

总结

1.实测内存,CPU占用,以及失效监听触发延时都优于使用redis

2.符合业务需求

最后

每个需求都有不同的解决方案,不要吊死在一棵树上!

微信支付码 微信支付
支付宝支付码 支付宝支付