From 58cb73f25c6ae68d87221e9d6ab28cb8e6bfff48 Mon Sep 17 00:00:00 2001 From: wangchao Date: Mon, 19 Mar 2018 10:54:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=8C=E7=BA=A7=E7=BC=93?= =?UTF-8?q?=E5=AD=98=EF=BC=8C=E5=90=8C=E6=97=B6=E5=A2=9E=E5=8A=A0L1\L2?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/tio/im/common/cache/IL2Cache.java | 12 ++++ .../caffeine/DefaultRemovalListener.java | 2 +- .../caffeineredis/CaffeineRedisCache.java | 18 ++++-- .../CaffeineRedisCacheManager.java | 10 ++- .../caffeineredis/RedisAsyncRunnable.java | 62 +++++++++++++++++++ .../common/cache/caffeineredis/RedisL2Vo.java | 47 ++++++++++++++ .../tio/im/common/cache/redis/ExpireVo.java | 16 ++++- .../common/cache/redis/JedisSubscriber.java | 4 +- .../tio/im/common/cache/redis/RedisCache.java | 2 +- .../cache/redis/RedisExpireUpdateTask.java | 53 ++++++++-------- .../im/common/cache/redis/SubRunnable.java | 2 +- 11 files changed, 186 insertions(+), 42 deletions(-) create mode 100644 tio-im-common/src/main/java/org/tio/im/common/cache/IL2Cache.java create mode 100644 tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisAsyncRunnable.java create mode 100644 tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisL2Vo.java diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/IL2Cache.java b/tio-im-common/src/main/java/org/tio/im/common/cache/IL2Cache.java new file mode 100644 index 0000000..4b1b954 --- /dev/null +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/IL2Cache.java @@ -0,0 +1,12 @@ +package org.tio.im.common.cache; + +import java.io.Serializable; + +/** + * @author WChao + * @date 2018年3月13日 下午7:47:28 + */ +public interface IL2Cache { + + public void putL2Async(String key, Serializable value); +} diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeine/DefaultRemovalListener.java b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeine/DefaultRemovalListener.java index f7259a0..ef43cd4 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeine/DefaultRemovalListener.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeine/DefaultRemovalListener.java @@ -33,7 +33,7 @@ public class DefaultRemovalListener implements RemovalListener { @Override public void onRemoval(K key, V value, RemovalCause cause) { - log.info("cacheName:{}, key:{}, value:{} was removed", cacheName, key, value); + log.debug("cacheName:{}, key:{}, value:{} was removed", cacheName, key, value); } } diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCache.java b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCache.java index e10c429..b21b85a 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCache.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCache.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import org.tio.im.common.cache.CacheChangeType; import org.tio.im.common.cache.CacheChangedVo; import org.tio.im.common.cache.ICache; +import org.tio.im.common.cache.IL2Cache; import org.tio.im.common.cache.caffeine.CaffeineCache; import org.tio.im.common.cache.redis.JedisTemplate; import org.tio.im.common.cache.redis.RedisCache; @@ -18,7 +19,7 @@ import org.tio.im.common.cache.redis.RedisExpireUpdateTask; * @author WChao * 2017年8月12日 下午9:13:54 */ -public class CaffeineRedisCache implements ICache { +public class CaffeineRedisCache implements ICache,IL2Cache { Logger log = LoggerFactory.getLogger(CaffeineRedisCache.class); @@ -78,14 +79,14 @@ public class CaffeineRedisCache implements ICache { if (ret == null) { ret = redisCache.get(key); if (ret != null) { - log.info("Cache L2 (redis) :{}={}",key,ret); + log.debug("Cache L2 (redis) :{}={}",key,ret); caffeineCache.put(key, ret); } } else {//在本地就取到数据了,那么需要在redis那定时更新一下过期时间 - log.info("Cache L1 (caffeine) :{}={}",key,ret); + log.debug("Cache L1 (caffeine) :{}={}",key,ret); Long timeToIdleSeconds = redisCache.getTimeToIdleSeconds(); if (timeToIdleSeconds != null) { - RedisExpireUpdateTask.add(cacheName, key, timeToIdleSeconds); + RedisExpireUpdateTask.add(cacheName, key, ret , timeToIdleSeconds); } } return ret; @@ -117,6 +118,12 @@ public class CaffeineRedisCache implements ICache { } } + @Override + public void putL2Async(String key, Serializable value) { + caffeineCache.put(key, value); + CaffeineRedisCacheManager.getAsyncRedisQueue().add(new RedisL2Vo(redisCache, key, value)); + } + @Override public void putTemporary(String key, Serializable value) { caffeineCache.putTemporary(key, value); @@ -155,6 +162,5 @@ public class CaffeineRedisCache implements ICache { public RedisCache getRedisCache() { return redisCache; } - - + } diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCacheManager.java b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCacheManager.java index b0535ce..b8a60b9 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCacheManager.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/CaffeineRedisCacheManager.java @@ -27,15 +27,15 @@ public class CaffeineRedisCacheManager { private static boolean inited = false; public static final String CACHE_CHANGE_TOPIC = "REDIS_CACHE_CHANGE_TOPIC_CAFFEINE"; - - + //L2异步存储队列; + private static RedisAsyncRunnable asyncRedisQueue = new RedisAsyncRunnable(); /** * 在本地最大的过期时间,这样可以防止内存爆掉,单位:秒 */ public static int MAX_EXPIRE_IN_LOCAL = 1800; private CaffeineRedisCacheManager(){} - + static{ try{ List configurations = CaffeineConfigurationFactory.parseConfiguration(); @@ -60,6 +60,7 @@ public class CaffeineRedisCacheManager { synchronized (CaffeineRedisCacheManager.class) { if (!inited) { new Thread(new SubRunnable(CACHE_CHANGE_TOPIC)).start(); + new Thread(asyncRedisQueue).start(); inited = true; } } @@ -94,4 +95,7 @@ public class CaffeineRedisCacheManager { return caffeineRedisCache; } + public static RedisAsyncRunnable getAsyncRedisQueue() { + return asyncRedisQueue; + } } diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisAsyncRunnable.java b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisAsyncRunnable.java new file mode 100644 index 0000000..9171305 --- /dev/null +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisAsyncRunnable.java @@ -0,0 +1,62 @@ +package org.tio.im.common.cache.caffeineredis; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tio.im.common.cache.CacheChangeType; +import org.tio.im.common.cache.CacheChangedVo; +import org.tio.im.common.cache.redis.JedisTemplate; +/** + * @author WChao + * @date 2018年3月13日 下午7:59:20 + */ +public class RedisAsyncRunnable implements Runnable{ + + private LinkedBlockingQueue redisL2VoQueue = new LinkedBlockingQueue(); + private static boolean started = false; + private Logger LOG = LoggerFactory.getLogger(RedisAsyncRunnable.class); + + public void add(RedisL2Vo redisL2Vo){ + this.redisL2VoQueue.offer(redisL2Vo); + } + @Override + public void run() { + if (started) { + return; + } + synchronized (RedisAsyncRunnable.class) { + if (started) { + return; + } + started = true; + } + List l2Datas = new ArrayList(); + while(true){ + try { + if(l2Datas.size() == 2000 || redisL2VoQueue.isEmpty()){//2000一提交(防止频繁访问Redis网络I/O消耗压力) + for(RedisL2Vo redisL2Vo : l2Datas){ + redisL2Vo.getRedisCache().put(redisL2Vo.getKey(),redisL2Vo.getValue()); + CacheChangedVo cacheChangedVo = new CacheChangedVo(redisL2Vo.getRedisCache().getCacheName(), redisL2Vo.getKey(), CacheChangeType.PUT); + JedisTemplate.me().publish(CaffeineRedisCacheManager.CACHE_CHANGE_TOPIC,cacheChangedVo.toString()); + } + l2Datas.clear(); + try { + Thread.sleep(1000 * 10); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + }else{ + RedisL2Vo redisL2Vo = redisL2VoQueue.poll(); + if(redisL2Vo != null){ + l2Datas.add(redisL2Vo); + } + } + } catch (Exception e) { + LOG.error(e.toString(),e); + } + } + } +} diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisL2Vo.java b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisL2Vo.java new file mode 100644 index 0000000..885dff5 --- /dev/null +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/caffeineredis/RedisL2Vo.java @@ -0,0 +1,47 @@ +package org.tio.im.common.cache.caffeineredis; + +import java.io.Serializable; + +import org.tio.im.common.cache.redis.RedisCache; + +/** + * @author WChao + * @date 2018年3月13日 下午8:05:50 + */ +public class RedisL2Vo { + + private RedisCache redisCache; + private String key; + private Serializable value; + + public RedisL2Vo(RedisCache redisCache , String key , Serializable value){ + this.redisCache = redisCache; + this.key = key; + this.value = value; + } + + public RedisCache getRedisCache() { + return redisCache; + } + + public void setRedisCache(RedisCache redisCache) { + this.redisCache = redisCache; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public Serializable getValue() { + return value; + } + + public void setValue(Serializable value) { + this.value = value; + } + +} diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/ExpireVo.java b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/ExpireVo.java index e1837a3..afd3f82 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/ExpireVo.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/ExpireVo.java @@ -1,5 +1,6 @@ package org.tio.im.common.cache.redis; +import java.io.Serializable; import java.util.Objects; /** @@ -31,15 +32,17 @@ public class ExpireVo { private String cacheName; private String key; + + private Serializable value ; private long expire; - public ExpireVo(String cacheName, String key, long expire) { + public ExpireVo(String cacheName, String key, Serializable value , long expire) { super(); this.cacheName = cacheName; this.key = key; this.expire = expire; - // this.expirable = expirable; + this.value = value; } @Override @@ -90,4 +93,13 @@ public class ExpireVo { public void setKey(String key) { this.key = key; } + + public Serializable getValue() { + return value; + } + + public void setValue(Serializable value) { + this.value = value; + } + } diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/JedisSubscriber.java b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/JedisSubscriber.java index 3af0c24..e975cbf 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/JedisSubscriber.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/JedisSubscriber.java @@ -36,12 +36,12 @@ public class JedisSubscriber extends JedisPubSub { return; } if (Objects.equals(CacheChangedVo.CLIENTID, clientid)) { - log.info("自己发布的消息,{}", clientid); + log.debug("自己发布的消息,{}", clientid); return; } CaffeineRedisCache caffeineRedisCache = CaffeineRedisCacheManager.getCache(cacheName); if (caffeineRedisCache == null) { - log.info("不能根据cacheName[{}]找到CaffeineRedisCache对象", cacheName); + log.debug("不能根据cacheName[{}]找到CaffeineRedisCache对象", cacheName); return; } if (type == CacheChangeType.PUT || type == CacheChangeType.UPDATE || type == CacheChangeType.REMOVE) { diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisCache.java b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisCache.java index c1c1c60..844d5cc 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisCache.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisCache.java @@ -68,7 +68,7 @@ public class RedisCache implements ICache { value = JedisTemplate.me().get(cacheKey(cacheName, key), Serializable.class); if (timeToIdleSeconds != null) { if (value != null) { - RedisExpireUpdateTask.add(cacheName, key, timeout); + RedisExpireUpdateTask.add(cacheName, key, value ,timeout); } } } catch (Exception e) { diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisExpireUpdateTask.java b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisExpireUpdateTask.java index 326769a..341540e 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisExpireUpdateTask.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/RedisExpireUpdateTask.java @@ -1,13 +1,12 @@ package org.tio.im.common.cache.redis; import java.io.Serializable; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tio.utils.lock.SetWithLock; /** * 定时更新redis的过期时间 @@ -19,13 +18,11 @@ public class RedisExpireUpdateTask { private static boolean started = false; - private static Set set = new HashSet<>(); + private static LinkedBlockingQueue redisExpireVoQueue = new LinkedBlockingQueue(); - private static SetWithLock setWithLock = new SetWithLock(set); - - public static void add(String cacheName, String key, long expire) { - ExpireVo expireVo = new ExpireVo(cacheName, key, expire); - setWithLock.add(expireVo); + public static void add(String cacheName, String key, Serializable value, long expire) { + ExpireVo expireVo = new ExpireVo(cacheName, key, value, expire); + redisExpireVoQueue.offer(expireVo); } public static void start() { @@ -42,27 +39,31 @@ public class RedisExpireUpdateTask { new Thread(new Runnable() { @Override public void run() { + List l2Datas = new ArrayList(); while (true) { - WriteLock writeLock = setWithLock.getLock().writeLock(); - writeLock.lock(); try { - Set set = setWithLock.getObj(); - for (ExpireVo expireVo : set) { - log.debug("更新缓存过期时间, cacheName:{}, key:{}, expire:{}", expireVo.getCacheName(), expireVo.getKey(), expireVo.getExpire()); - Serializable value = RedisCacheManager.getCache(expireVo.getCacheName()).get(expireVo.getKey()); - if(value != null) - JedisTemplate.me().set(expireVo.getCacheName()+":"+expireVo.getKey(), value, Integer.parseInt(expireVo.getExpire()+"")); + if(l2Datas.size() == 2000 || redisExpireVoQueue.isEmpty()){//2000一提交(防止频繁访问Redis网络I/O消耗压力) + for (ExpireVo expireVo : l2Datas) { + log.debug("更新缓存过期时间, cacheName:{}, key:{}, expire:{}", expireVo.getCacheName(), expireVo.getKey(), expireVo.getExpire()); + Serializable value = expireVo.getValue(); + if(value != null) + JedisTemplate.me().set(expireVo.getCacheName()+":"+expireVo.getKey(), value, Integer.parseInt(expireVo.getExpire()+"")); + } + l2Datas.clear(); + try { + Thread.sleep(1000 * 10); + } catch (InterruptedException e) { + log.error(e.toString(), e); + } + } + else{ + ExpireVo expireVo = redisExpireVoQueue.poll(); + if(expireVo != null){ + l2Datas.add(expireVo); + } } - set.clear(); } catch (Throwable e) { log.error(e.getMessage(), e); - } finally { - writeLock.unlock(); - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { - log.error(e.toString(), e); - } } } diff --git a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/SubRunnable.java b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/SubRunnable.java index 4e55a25..de5c4dc 100644 --- a/tio-im-common/src/main/java/org/tio/im/common/cache/redis/SubRunnable.java +++ b/tio-im-common/src/main/java/org/tio/im/common/cache/redis/SubRunnable.java @@ -24,7 +24,7 @@ public class SubRunnable implements Runnable { } public void run() { - log.warn("订阅 redis , chanel {} , 线程将阻塞",subChannel); + log.debug("订阅 redis , chanel {} , 线程将阻塞",subChannel); try { jedis = JedisTemplate.me().getJedis(); if(jedis != null){