缓存的存在的目的在于加速查询的速度,降低数据库的请求。hugegraph的缓存是怎么实现的?
cache 中包含两部分:
RamCache:对内缓存
LevelCache:分级缓存
OffheapCache: 堆外缓存
堆内缓存是jvm创建和管理的,而相对应堆外缓存是自创建和管理的内存,比如netty中大量的nio使用的就是堆外缓存。GC算法的回收并不会对他们起作用。
java 中可以通过DirectByteBuffer 直接创建 也可亿通过byte.allocateDirect去创建。
首先来看堆内缓存:
对内缓存是通过map肯链表实现的LRU算法。
protected final Object access(Id id) {
assert id != null;
long halfCapacity = this.halfCapacity();
if (this.map.size() <= halfCapacity) { //小于一半直接取出
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
assert id.equals(node.key());
return node.value();
}
final Lock lock = this.keyLock.lock(id);
try {
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
// NOTE: update the queue only if the size > capacity/2
if (this.map.size() > halfCapacity) {//大于一半 那么将该节点移除后重新插入链表的头部
// Move the node from mid to tail
if (this.queue.remove(node) == null) {
// The node may be removed by others through dequeue()
return null;
}
this.queue.enqueue(node);
}
assert id.equals(node.key());
return node.value();
} finally {
lock.unlock();
}
}
@Override
@Watched(prefix = "ramcache")
protected final boolean write(Id id, Object value) {
assert id != null;
long capacity = this.capacity();
assert capacity > 0;
final Lock lock = this.keyLock.lock(id);
try {
// The cache is full
while (this.map.size() >= capacity) { //判断的map的size是否超过了容量的大小
/*
* Remove the oldest from the queue
* NOTE: it maybe return null if someone else (that's other
* threads) are doing dequeue() and the queue may be empty.
*/
LinkNode<Id, Object> removed = this.queue.dequeue();//取出队列开始的节点->最久没被访问的节点
if (removed == null) {
/*
* If at this time someone add some new items, these will
* be cleared in the map, but still stay in the queue, so
* the queue will have some more nodes than the map.
*/
this.map.clear();
break;
}
/*
* Remove the oldest from the map
* NOTE: it maybe return null if other threads are doing remove
*/
this.map.remove(removed.key()); //同时移除map中的该节点
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache replaced '{}' with '{}' (capacity={})",
removed.key(), id, capacity);
}
/*
* Release the object
* NOTE: we can't reuse the removed node due to someone else
* may access the node (will do remove() -> enqueue())
*/
removed = null;
}
// Remove the old node if exists
LinkNode<Id, Object> node = this.map.get(id);
if (node != null) {
this.queue.remove(node); //移除链表上的该节点
}
// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value));
return true;
} finally {
lock.unlock();
}
}
OffHeapCache的实现是依赖于OHCache的。在数据操作的过程中需要将对象序列化之后存入堆外内存当中。
protected boolean write(Id id, Object value) {
Value serializedValue = new Value(value); //首先将value进行序列化
int serializedSize = serializedValue.serializedSize();
if (serializedSize > VALUE_SIZE_TO_SKIP) { //检查size
LOG.info("Skip to cache '{}' due to value size {} > limit {}",
id, serializedSize, VALUE_SIZE_TO_SKIP);
return false;
}
long expireTime = this.expire();
boolean success;
if (expireTime <= 0) {
success = this.cache.put(id, serializedValue);
} else {
expireTime += now(); //设置过期时间
/*
* Seems only the linked implementation support expiring entries,
* the chunked implementation does not support it.
*/
success = this.cache.put(id, serializedValue, expireTime);
}
assert success;
return success;
}
LevelCache 这个的实现相对简单 就是将多个cache组合在一起分级使用
protected Object access(Id id) {
for (AbstractCache<Id, Object> cache : this.caches) {
// Priority access to the previous level
Object value = cache.access(id);
if (value != null) {
return value;
}
}
return null;
}
@Override
protected boolean write(Id id, Object value) {
boolean success = false;
for (AbstractCache<Id, Object> cache : this.caches) {
success |= cache.write(id, value);
}
return success;
}
最后说下这个cacheManager. 负责管理这些cache 它上边有个定时任务定时检查cache上过期的key 并将其移除。
private TimerTask scheduleTimer(float period) {
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
for (Entry<String, Cache<Id, Object>> entry :
caches().entrySet()) {
this.tick(entry.getKey(), entry.getValue());
}
} catch (Throwable e) {
LOG.warn("An exception occurred when running tick", e);
}
}
private void tick(String name, Cache<Id, Object> cache) {
long start = System.currentTimeMillis();
long items = cache.tick();
long cost = System.currentTimeMillis() - start;
if (cost > LOG_TICK_COST_TIME) {
LOG.info("Cache '{}' expired {} items cost {}ms > {}ms " +
"(size {}, expire {}ms)", name, items, cost,
LOG_TICK_COST_TIME, cache.size(), cache.expire());
}
LOG.debug("Cache '{}' expiration tick cost {}ms", name, cost);
}
};
// Schedule task with the period in seconds
this.timer.schedule(task, 0, (long) (period * 1000.0));
return task;
}
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 3, 2022 at 11:58 am