hugegraph 缓存实现(四)
in with 0 comment

hugegraph 缓存实现(四)

in with 0 comment

缓存的存在的目的在于加速查询的速度,降低数据库的请求。hugegraph的缓存是怎么实现的?

cache 中包含两部分:
RamCache:对内缓存
LevelCache:分级缓存
OffheapCache: 堆外缓存
堆内缓存是jvm创建和管理的,而相对应堆外缓存是自创建和管理的内存,比如netty中大量的nio使用的就是堆外缓存。GC算法的回收并不会对他们起作用。
java 中可以通过DirectByteBuffer 直接创建 也可亿通过byte.allocateDirect去创建。

首先来看堆内缓存:
对内缓存是通过map肯链表实现的LRU算法。

    protected final Object access(Id id) {
        assert id != null;

        long halfCapacity = this.halfCapacity();
        if (this.map.size() <= halfCapacity) { //小于一半直接取出
            LinkNode<Id, Object> node = this.map.get(id);
            if (node == null) {
                return null;
            }
            assert id.equals(node.key());
            return node.value();
        }

        final Lock lock = this.keyLock.lock(id);
        try {
            LinkNode<Id, Object> node = this.map.get(id);
            if (node == null) {
                return null;
            }

            // NOTE: update the queue only if the size > capacity/2
            if (this.map.size() > halfCapacity) {//大于一半 那么将该节点移除后重新插入链表的头部
                // Move the node from mid to tail
                if (this.queue.remove(node) == null) {
                    // The node may be removed by others through dequeue()
                    return null;
                }
                this.queue.enqueue(node);
            }

            assert id.equals(node.key());
            return node.value();
        } finally {
            lock.unlock();
        }
    }

    @Override
    @Watched(prefix = "ramcache")
    protected final boolean write(Id id, Object value) {
        assert id != null;
        long capacity = this.capacity();
        assert capacity > 0;

        final Lock lock = this.keyLock.lock(id);
        try {
            // The cache is full
            while (this.map.size() >= capacity) { //判断的map的size是否超过了容量的大小
                /*
                 * Remove the oldest from the queue
                 * NOTE: it maybe return null if someone else (that's other
                 * threads) are doing dequeue() and the queue may be empty.
                 */
                LinkNode<Id, Object> removed = this.queue.dequeue();//取出队列开始的节点->最久没被访问的节点
                if (removed == null) {
                    /*
                     * If at this time someone add some new items, these will
                     * be cleared in the map, but still stay in the queue, so
                     * the queue will have some more nodes than the map.
                     */
                    this.map.clear();
                    break;
                }
                /*
                 * Remove the oldest from the map
                 * NOTE: it maybe return null if other threads are doing remove
                 */
                this.map.remove(removed.key()); //同时移除map中的该节点
                if (LOG.isDebugEnabled()) {
                    LOG.debug("RamCache replaced '{}' with '{}' (capacity={})",
                              removed.key(), id, capacity);
                }
                /*
                 * Release the object
                 * NOTE: we can't reuse the removed node due to someone else
                 * may access the node (will do remove() -> enqueue())
                 */
                removed = null;
            }

            // Remove the old node if exists
            LinkNode<Id, Object> node = this.map.get(id);
            if (node != null) {
                this.queue.remove(node); //移除链表上的该节点
            }

            // Add the new item to tail of the queue, then map it
            this.map.put(id, this.queue.enqueue(id, value));
            return true;
        } finally {
            lock.unlock();
        }
    }

OffHeapCache的实现是依赖于OHCache的。在数据操作的过程中需要将对象序列化之后存入堆外内存当中。

protected boolean write(Id id, Object value) {
        Value serializedValue = new Value(value); //首先将value进行序列化
        int serializedSize = serializedValue.serializedSize(); 
        if (serializedSize > VALUE_SIZE_TO_SKIP) { //检查size
            LOG.info("Skip to cache '{}' due to value size {} > limit {}",
                      id, serializedSize, VALUE_SIZE_TO_SKIP);
            return false;
        }
        long expireTime = this.expire();
        boolean success;
        if (expireTime <= 0) {
             success = this.cache.put(id, serializedValue);
        } else {
            expireTime += now(); //设置过期时间
            /*
             * Seems only the linked implementation support expiring entries,
             * the chunked implementation does not support it.
             */
            success = this.cache.put(id, serializedValue, expireTime);
        }
        assert success;
        return success;
    }

LevelCache 这个的实现相对简单 就是将多个cache组合在一起分级使用

 protected Object access(Id id) {
        for (AbstractCache<Id, Object> cache : this.caches) {
            // Priority access to the previous level
            Object value = cache.access(id);
            if (value != null) {
                return value;
            }
        }
        return null;
    }

    @Override
    protected boolean write(Id id, Object value) {
        boolean success = false;
        for (AbstractCache<Id, Object> cache : this.caches) {
            success |= cache.write(id, value);
        }
        return success;
    }

最后说下这个cacheManager. 负责管理这些cache 它上边有个定时任务定时检查cache上过期的key 并将其移除。

private TimerTask scheduleTimer(float period) {
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                try {
                    for (Entry<String, Cache<Id, Object>> entry :
                         caches().entrySet()) {
                        this.tick(entry.getKey(), entry.getValue());
                    }
                } catch (Throwable e) {
                    LOG.warn("An exception occurred when running tick", e);
                }
            }

            private void tick(String name, Cache<Id, Object> cache) {
                long start = System.currentTimeMillis();
                long items = cache.tick();
                long cost = System.currentTimeMillis() - start;
                if (cost > LOG_TICK_COST_TIME) {
                    LOG.info("Cache '{}' expired {} items cost {}ms > {}ms " +
                             "(size {}, expire {}ms)", name, items, cost,
                             LOG_TICK_COST_TIME, cache.size(), cache.expire());
                }
                LOG.debug("Cache '{}' expiration tick cost {}ms", name, cost);
            }
        };

        // Schedule task with the period in seconds
        this.timer.schedule(task, 0, (long) (period * 1000.0));

        return task;
    }
Responses