优化二级缓存,同时增加L1\L2异步存储

This commit is contained in:
wangchao 2018-03-19 10:54:08 +08:00
parent 1e3acaffc3
commit 58cb73f25c
11 changed files with 186 additions and 42 deletions

View File

@ -0,0 +1,12 @@
package org.tio.im.common.cache;
import java.io.Serializable;
/**
* @author WChao
* @date 2018年3月13日 下午7:47:28
*/
public interface IL2Cache {
public void putL2Async(String key, Serializable value);
}

View File

@ -33,7 +33,7 @@ public class DefaultRemovalListener<K, V> implements RemovalListener<K, V> {
@Override
public void onRemoval(K key, V value, RemovalCause cause) {
log.info("cacheName:{}, key:{}, value:{} was removed", cacheName, key, value);
log.debug("cacheName:{}, key:{}, value:{} was removed", cacheName, key, value);
}
}

View File

@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import org.tio.im.common.cache.CacheChangeType;
import org.tio.im.common.cache.CacheChangedVo;
import org.tio.im.common.cache.ICache;
import org.tio.im.common.cache.IL2Cache;
import org.tio.im.common.cache.caffeine.CaffeineCache;
import org.tio.im.common.cache.redis.JedisTemplate;
import org.tio.im.common.cache.redis.RedisCache;
@ -18,7 +19,7 @@ import org.tio.im.common.cache.redis.RedisExpireUpdateTask;
* @author WChao
* 2017年8月12日 下午9:13:54
*/
public class CaffeineRedisCache implements ICache {
public class CaffeineRedisCache implements ICache,IL2Cache {
Logger log = LoggerFactory.getLogger(CaffeineRedisCache.class);
@ -78,14 +79,14 @@ public class CaffeineRedisCache implements ICache {
if (ret == null) {
ret = redisCache.get(key);
if (ret != null) {
log.info("Cache L2 (redis) :{}={}",key,ret);
log.debug("Cache L2 (redis) :{}={}",key,ret);
caffeineCache.put(key, ret);
}
} else {//在本地就取到数据了那么需要在redis那定时更新一下过期时间
log.info("Cache L1 (caffeine) :{}={}",key,ret);
log.debug("Cache L1 (caffeine) :{}={}",key,ret);
Long timeToIdleSeconds = redisCache.getTimeToIdleSeconds();
if (timeToIdleSeconds != null) {
RedisExpireUpdateTask.add(cacheName, key, timeToIdleSeconds);
RedisExpireUpdateTask.add(cacheName, key, ret , timeToIdleSeconds);
}
}
return ret;
@ -117,6 +118,12 @@ public class CaffeineRedisCache implements ICache {
}
}
@Override
public void putL2Async(String key, Serializable value) {
caffeineCache.put(key, value);
CaffeineRedisCacheManager.getAsyncRedisQueue().add(new RedisL2Vo(redisCache, key, value));
}
@Override
public void putTemporary(String key, Serializable value) {
caffeineCache.putTemporary(key, value);
@ -155,6 +162,5 @@ public class CaffeineRedisCache implements ICache {
public RedisCache getRedisCache() {
return redisCache;
}
}

View File

@ -27,15 +27,15 @@ public class CaffeineRedisCacheManager {
private static boolean inited = false;
public static final String CACHE_CHANGE_TOPIC = "REDIS_CACHE_CHANGE_TOPIC_CAFFEINE";
//L2异步存储队列;
private static RedisAsyncRunnable asyncRedisQueue = new RedisAsyncRunnable();
/**
* 在本地最大的过期时间这样可以防止内存爆掉单位
*/
public static int MAX_EXPIRE_IN_LOCAL = 1800;
private CaffeineRedisCacheManager(){}
static{
try{
List<CaffeineConfiguration> configurations = CaffeineConfigurationFactory.parseConfiguration();
@ -60,6 +60,7 @@ public class CaffeineRedisCacheManager {
synchronized (CaffeineRedisCacheManager.class) {
if (!inited) {
new Thread(new SubRunnable(CACHE_CHANGE_TOPIC)).start();
new Thread(asyncRedisQueue).start();
inited = true;
}
}
@ -94,4 +95,7 @@ public class CaffeineRedisCacheManager {
return caffeineRedisCache;
}
public static RedisAsyncRunnable getAsyncRedisQueue() {
return asyncRedisQueue;
}
}

View File

@ -0,0 +1,62 @@
package org.tio.im.common.cache.caffeineredis;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.im.common.cache.CacheChangeType;
import org.tio.im.common.cache.CacheChangedVo;
import org.tio.im.common.cache.redis.JedisTemplate;
/**
* @author WChao
* @date 2018年3月13日 下午7:59:20
*/
public class RedisAsyncRunnable implements Runnable{
private LinkedBlockingQueue<RedisL2Vo> redisL2VoQueue = new LinkedBlockingQueue<RedisL2Vo>();
private static boolean started = false;
private Logger LOG = LoggerFactory.getLogger(RedisAsyncRunnable.class);
public void add(RedisL2Vo redisL2Vo){
this.redisL2VoQueue.offer(redisL2Vo);
}
@Override
public void run() {
if (started) {
return;
}
synchronized (RedisAsyncRunnable.class) {
if (started) {
return;
}
started = true;
}
List<RedisL2Vo> l2Datas = new ArrayList<RedisL2Vo>();
while(true){
try {
if(l2Datas.size() == 2000 || redisL2VoQueue.isEmpty()){//2000一提交(防止频繁访问Redis网络I/O消耗压力)
for(RedisL2Vo redisL2Vo : l2Datas){
redisL2Vo.getRedisCache().put(redisL2Vo.getKey(),redisL2Vo.getValue());
CacheChangedVo cacheChangedVo = new CacheChangedVo(redisL2Vo.getRedisCache().getCacheName(), redisL2Vo.getKey(), CacheChangeType.PUT);
JedisTemplate.me().publish(CaffeineRedisCacheManager.CACHE_CHANGE_TOPIC,cacheChangedVo.toString());
}
l2Datas.clear();
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}else{
RedisL2Vo redisL2Vo = redisL2VoQueue.poll();
if(redisL2Vo != null){
l2Datas.add(redisL2Vo);
}
}
} catch (Exception e) {
LOG.error(e.toString(),e);
}
}
}
}

View File

@ -0,0 +1,47 @@
package org.tio.im.common.cache.caffeineredis;
import java.io.Serializable;
import org.tio.im.common.cache.redis.RedisCache;
/**
* @author WChao
* @date 2018年3月13日 下午8:05:50
*/
public class RedisL2Vo {
private RedisCache redisCache;
private String key;
private Serializable value;
public RedisL2Vo(RedisCache redisCache , String key , Serializable value){
this.redisCache = redisCache;
this.key = key;
this.value = value;
}
public RedisCache getRedisCache() {
return redisCache;
}
public void setRedisCache(RedisCache redisCache) {
this.redisCache = redisCache;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Serializable getValue() {
return value;
}
public void setValue(Serializable value) {
this.value = value;
}
}

View File

@ -1,5 +1,6 @@
package org.tio.im.common.cache.redis;
import java.io.Serializable;
import java.util.Objects;
/**
@ -31,15 +32,17 @@ public class ExpireVo {
private String cacheName;
private String key;
private Serializable value ;
private long expire;
public ExpireVo(String cacheName, String key, long expire) {
public ExpireVo(String cacheName, String key, Serializable value , long expire) {
super();
this.cacheName = cacheName;
this.key = key;
this.expire = expire;
// this.expirable = expirable;
this.value = value;
}
@Override
@ -90,4 +93,13 @@ public class ExpireVo {
public void setKey(String key) {
this.key = key;
}
public Serializable getValue() {
return value;
}
public void setValue(Serializable value) {
this.value = value;
}
}

View File

@ -36,12 +36,12 @@ public class JedisSubscriber extends JedisPubSub {
return;
}
if (Objects.equals(CacheChangedVo.CLIENTID, clientid)) {
log.info("自己发布的消息,{}", clientid);
log.debug("自己发布的消息,{}", clientid);
return;
}
CaffeineRedisCache caffeineRedisCache = CaffeineRedisCacheManager.getCache(cacheName);
if (caffeineRedisCache == null) {
log.info("不能根据cacheName[{}]找到CaffeineRedisCache对象", cacheName);
log.debug("不能根据cacheName[{}]找到CaffeineRedisCache对象", cacheName);
return;
}
if (type == CacheChangeType.PUT || type == CacheChangeType.UPDATE || type == CacheChangeType.REMOVE) {

View File

@ -68,7 +68,7 @@ public class RedisCache implements ICache {
value = JedisTemplate.me().get(cacheKey(cacheName, key), Serializable.class);
if (timeToIdleSeconds != null) {
if (value != null) {
RedisExpireUpdateTask.add(cacheName, key, timeout);
RedisExpireUpdateTask.add(cacheName, key, value ,timeout);
}
}
} catch (Exception e) {

View File

@ -1,13 +1,12 @@
package org.tio.im.common.cache.redis;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.lock.SetWithLock;
/**
* 定时更新redis的过期时间
@ -19,13 +18,11 @@ public class RedisExpireUpdateTask {
private static boolean started = false;
private static Set<ExpireVo> set = new HashSet<>();
private static LinkedBlockingQueue<ExpireVo> redisExpireVoQueue = new LinkedBlockingQueue<ExpireVo>();
private static SetWithLock<ExpireVo> setWithLock = new SetWithLock<ExpireVo>(set);
public static void add(String cacheName, String key, long expire) {
ExpireVo expireVo = new ExpireVo(cacheName, key, expire);
setWithLock.add(expireVo);
public static void add(String cacheName, String key, Serializable value, long expire) {
ExpireVo expireVo = new ExpireVo(cacheName, key, value, expire);
redisExpireVoQueue.offer(expireVo);
}
public static void start() {
@ -42,27 +39,31 @@ public class RedisExpireUpdateTask {
new Thread(new Runnable() {
@Override
public void run() {
List<ExpireVo> l2Datas = new ArrayList<ExpireVo>();
while (true) {
WriteLock writeLock = setWithLock.getLock().writeLock();
writeLock.lock();
try {
Set<ExpireVo> set = setWithLock.getObj();
for (ExpireVo expireVo : set) {
log.debug("更新缓存过期时间, cacheName:{}, key:{}, expire:{}", expireVo.getCacheName(), expireVo.getKey(), expireVo.getExpire());
Serializable value = RedisCacheManager.getCache(expireVo.getCacheName()).get(expireVo.getKey());
if(value != null)
JedisTemplate.me().set(expireVo.getCacheName()+":"+expireVo.getKey(), value, Integer.parseInt(expireVo.getExpire()+""));
if(l2Datas.size() == 2000 || redisExpireVoQueue.isEmpty()){//2000一提交(防止频繁访问Redis网络I/O消耗压力)
for (ExpireVo expireVo : l2Datas) {
log.debug("更新缓存过期时间, cacheName:{}, key:{}, expire:{}", expireVo.getCacheName(), expireVo.getKey(), expireVo.getExpire());
Serializable value = expireVo.getValue();
if(value != null)
JedisTemplate.me().set(expireVo.getCacheName()+":"+expireVo.getKey(), value, Integer.parseInt(expireVo.getExpire()+""));
}
l2Datas.clear();
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
log.error(e.toString(), e);
}
}
else{
ExpireVo expireVo = redisExpireVoQueue.poll();
if(expireVo != null){
l2Datas.add(expireVo);
}
}
set.clear();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
writeLock.unlock();
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
log.error(e.toString(), e);
}
}
}

View File

@ -24,7 +24,7 @@ public class SubRunnable implements Runnable {
}
public void run() {
log.warn("订阅 redis , chanel {} , 线程将阻塞",subChannel);
log.debug("订阅 redis , chanel {} , 线程将阻塞",subChannel);
try {
jedis = JedisTemplate.me().getJedis();
if(jedis != null){