hugegraph 图数据库graph创建(三)
in with 0 comment

hugegraph 图数据库graph创建(三)

in with 0 comment

在第一小节里我们看了graph 创建的过程

    public Vertex addVertex(Object... keyValues) {
        return verifyElemPermission(HugePermission.WRITE, () -> {
            return (HugeVertex) this.hugegraph.addVertex(keyValues);
        });
    }
    public HugeVertex addVertex(Object... keyValues) {
        return this.addVertex(this.constructVertex(true, keyValues));
    }

    @Watched("graph.addVertex-instance")
    public HugeVertex addVertex(HugeVertex vertex) {
        this.checkOwnerThread(); //当前的线程
        assert !vertex.removed();

        // Override vertices in local `removedVertices`
        this.removedVertices.remove(vertex.id()) ; //先移除vertex id
        try {
            this.locksTable.lockReads(LockUtil.VERTEX_LABEL_DELETE,
                                      vertex.schemaLabel().id());//锁住点 边的schema
            this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE,
                                      vertex.schemaLabel().indexLabels());
            // Ensure vertex label still exists from vertex-construct to lock
            this.graph().vertexLabel(vertex.schemaLabel().id());
            /*
             * No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
             * update only can add nullable properties and user data, which is
             * unconcerned with add vertex
             */
            this.beforeWrite();
            this.addedVertices.put(vertex.id(), vertex);
            this.afterWrite();
        } catch (Throwable e){
            this.locksTable.unlock();
            throw e;
        }
        return vertex;
    }

事务关键处理的地方主要三个beforeWrite,addVertices afterWrite
先看看beforeWrite 在干什么

protected void beforeWrite() {
        this.checkTxVerticesCapacity(); //检查点事务的容量 
        this.checkTxEdgesCapacity();//检查边事务的容量

        super.beforeWrite(); //父类方法处理
    }
    protected void afterWrite() {
        if (this.autoCommit()) {
            this.commitOrRollback();
        }
    }

跟踪调用链我们可以看到最终走到abstraction 的commit 方法,

public void commit() throws BackendException {
        LOG.debug("Transaction commit() [auto: {}]...", this.autoCommit);
        this.checkOwnerThread();

        if (this.closed) {
            throw new BackendException("Transaction has been closed");
        }

        if (this.committing) {
            // It is not allowed to recursively commit in a transaction
            return;
        }

        if (!this.hasUpdate()) {
            LOG.debug("Transaction has no data to commit({})", store());
            return;
        }

        // Do rate limit if needed
        RateLimiter rateLimiter = this.graph.writeRateLimiter();
        if (rateLimiter != null) {
            int size = this.mutationSize();
            double time = size > 0 ? rateLimiter.acquire(size) : 0.0;
            if (time > 0) {
                LOG.debug("Waited for {}s to mutate {} item(s)", time, size);
            }
            BackendEntryIterator.checkInterrupted();
        }

        // Do commit
        assert !this.committing : "Not allowed to commit when it's committing";
        this.committing = true;
        try {
            this.commit2Backend();
        } finally {
            this.committing = false;
            this.reset();
        }
    }

关键在commit2Backend和reset

protected void commit2Backend() {
        BackendMutation mutation = this.prepareCommit();
        assert !mutation.isEmpty();
        this.commitMutation2Backend(mutation);
    }
    protected void commitMutation2Backend(BackendMutation... mutations) {
        assert mutations.length > 0;
        this.committing2Backend = true;

        // If an exception occurred, catch in the upper layer and rollback
        this.store.beginTx();
        for (BackendMutation mutation : mutations) {
            this.store.mutate(mutation);
        }
        this.store.commitTx();

        this.committing2Backend = false;
    }
        protected BackendMutation prepareCommit() {
        // For sub-class preparing data, nothing to do here
        LOG.debug("Transaction prepareCommit()...");
        return this.mutation();
    }

可以看到在AbstractTransaction的一些信息中实现了preparecommit 那么具体是怎么实现的呢。

AbstractTransaction->GraphTransaction->CachedGraphTransaction
那cache GraphTransaction 在transaction 这里做了利用LRU的算法做了缓存。
我们具体看看GraphTransaction是如何实现preparecommit和commit的算法。

protected BackendMutation prepareCommit() {
        // Serialize and add updates into super.deletions
        if (this.removedVertices.size() > 0 || this.removedEdges.size() > 0) {
            this.prepareDeletions(this.removedVertices, this.removedEdges);
        }

        if (this.addedProps.size() > 0 || this.removedProps.size() > 0) {
            this.prepareUpdates(this.addedProps, this.removedProps);
        }

        // Serialize and add updates into super.additions
        if (this.addedVertices.size() > 0 || this.addedEdges.size() > 0) {
            this.prepareAdditions(this.addedVertices, this.addedEdges);
        }

        return this.mutation();
    }

做了三个 操作 remove update add 这几个操作来看看add的操作

protected void prepareAdditions(Map<Id, HugeVertex> addedVertices,
                                    Map<Id, HugeEdge> addedEdges) {
        if (this.checkCustomVertexExist) {
            this.checkVertexExistIfCustomizedId(addedVertices);
        }
        // Do vertex update
        for (HugeVertex v : addedVertices.values()) {
            assert !v.removed();
            v.committed();
            this.checkAggregateProperty(v);
            // Check whether passed all non-null properties
            if (!this.graphMode().loading()) {
                this.checkNonnullProperty(v);
            }

            // Add vertex entry
            this.doInsert(this.serializer.writeVertex(v));
            // Update index of vertex(only include props)
            this.indexTx.updateVertexIndex(v, false);
            this.indexTx.updateLabelIndex(v, false);
        }

        // Do edge update
        for (HugeEdge e : addedEdges.values()) {
            assert !e.removed();
            e.committed();
            // Skip edge if its owner has been removed
            if (this.removingEdgeOwner(e)) {
                continue;
            }
            this.checkAggregateProperty(e);
            // Add edge entry of OUT and IN
            this.doInsert(this.serializer.writeEdge(e));
            this.doInsert(this.serializer.writeEdge(e.switchOwner()));
            // Update index of edge
            this.indexTx.updateEdgeIndex(e, false);
            this.indexTx.updateLabelIndex(e, false);
        }
    }

程序依次拿出相关的vertex定义然后做

doInsert
updateVertexIndex
updateLabelIndex

这几个操作最终你会发现这些操作被放到mutation中,最后被hbase这些存储处理。

Responses