在第一小节里我们看了graph 创建的过程
public Vertex addVertex(Object... keyValues) {
return verifyElemPermission(HugePermission.WRITE, () -> {
return (HugeVertex) this.hugegraph.addVertex(keyValues);
});
}
public HugeVertex addVertex(Object... keyValues) {
return this.addVertex(this.constructVertex(true, keyValues));
}
@Watched("graph.addVertex-instance")
public HugeVertex addVertex(HugeVertex vertex) {
this.checkOwnerThread(); //当前的线程
assert !vertex.removed();
// Override vertices in local `removedVertices`
this.removedVertices.remove(vertex.id()) ; //先移除vertex id
try {
this.locksTable.lockReads(LockUtil.VERTEX_LABEL_DELETE,
vertex.schemaLabel().id());//锁住点 边的schema
this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE,
vertex.schemaLabel().indexLabels());
// Ensure vertex label still exists from vertex-construct to lock
this.graph().vertexLabel(vertex.schemaLabel().id());
/*
* No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
* update only can add nullable properties and user data, which is
* unconcerned with add vertex
*/
this.beforeWrite();
this.addedVertices.put(vertex.id(), vertex);
this.afterWrite();
} catch (Throwable e){
this.locksTable.unlock();
throw e;
}
return vertex;
}
事务关键处理的地方主要三个beforeWrite,addVertices afterWrite
先看看beforeWrite 在干什么
protected void beforeWrite() {
this.checkTxVerticesCapacity(); //检查点事务的容量
this.checkTxEdgesCapacity();//检查边事务的容量
super.beforeWrite(); //父类方法处理
}
protected void afterWrite() {
if (this.autoCommit()) {
this.commitOrRollback();
}
}
跟踪调用链我们可以看到最终走到abstraction 的commit 方法,
public void commit() throws BackendException {
LOG.debug("Transaction commit() [auto: {}]...", this.autoCommit);
this.checkOwnerThread();
if (this.closed) {
throw new BackendException("Transaction has been closed");
}
if (this.committing) {
// It is not allowed to recursively commit in a transaction
return;
}
if (!this.hasUpdate()) {
LOG.debug("Transaction has no data to commit({})", store());
return;
}
// Do rate limit if needed
RateLimiter rateLimiter = this.graph.writeRateLimiter();
if (rateLimiter != null) {
int size = this.mutationSize();
double time = size > 0 ? rateLimiter.acquire(size) : 0.0;
if (time > 0) {
LOG.debug("Waited for {}s to mutate {} item(s)", time, size);
}
BackendEntryIterator.checkInterrupted();
}
// Do commit
assert !this.committing : "Not allowed to commit when it's committing";
this.committing = true;
try {
this.commit2Backend();
} finally {
this.committing = false;
this.reset();
}
}
关键在commit2Backend和reset
protected void commit2Backend() {
BackendMutation mutation = this.prepareCommit();
assert !mutation.isEmpty();
this.commitMutation2Backend(mutation);
}
protected void commitMutation2Backend(BackendMutation... mutations) {
assert mutations.length > 0;
this.committing2Backend = true;
// If an exception occurred, catch in the upper layer and rollback
this.store.beginTx();
for (BackendMutation mutation : mutations) {
this.store.mutate(mutation);
}
this.store.commitTx();
this.committing2Backend = false;
}
protected BackendMutation prepareCommit() {
// For sub-class preparing data, nothing to do here
LOG.debug("Transaction prepareCommit()...");
return this.mutation();
}
可以看到在AbstractTransaction的一些信息中实现了preparecommit 那么具体是怎么实现的呢。
AbstractTransaction->GraphTransaction->CachedGraphTransaction
那cache GraphTransaction 在transaction 这里做了利用LRU的算法做了缓存。
我们具体看看GraphTransaction是如何实现preparecommit和commit的算法。
protected BackendMutation prepareCommit() {
// Serialize and add updates into super.deletions
if (this.removedVertices.size() > 0 || this.removedEdges.size() > 0) {
this.prepareDeletions(this.removedVertices, this.removedEdges);
}
if (this.addedProps.size() > 0 || this.removedProps.size() > 0) {
this.prepareUpdates(this.addedProps, this.removedProps);
}
// Serialize and add updates into super.additions
if (this.addedVertices.size() > 0 || this.addedEdges.size() > 0) {
this.prepareAdditions(this.addedVertices, this.addedEdges);
}
return this.mutation();
}
做了三个 操作 remove update add 这几个操作来看看add的操作
protected void prepareAdditions(Map<Id, HugeVertex> addedVertices,
Map<Id, HugeEdge> addedEdges) {
if (this.checkCustomVertexExist) {
this.checkVertexExistIfCustomizedId(addedVertices);
}
// Do vertex update
for (HugeVertex v : addedVertices.values()) {
assert !v.removed();
v.committed();
this.checkAggregateProperty(v);
// Check whether passed all non-null properties
if (!this.graphMode().loading()) {
this.checkNonnullProperty(v);
}
// Add vertex entry
this.doInsert(this.serializer.writeVertex(v));
// Update index of vertex(only include props)
this.indexTx.updateVertexIndex(v, false);
this.indexTx.updateLabelIndex(v, false);
}
// Do edge update
for (HugeEdge e : addedEdges.values()) {
assert !e.removed();
e.committed();
// Skip edge if its owner has been removed
if (this.removingEdgeOwner(e)) {
continue;
}
this.checkAggregateProperty(e);
// Add edge entry of OUT and IN
this.doInsert(this.serializer.writeEdge(e));
this.doInsert(this.serializer.writeEdge(e.switchOwner()));
// Update index of edge
this.indexTx.updateEdgeIndex(e, false);
this.indexTx.updateLabelIndex(e, false);
}
}
程序依次拿出相关的vertex定义然后做
doInsert
updateVertexIndex
updateLabelIndex
这几个操作最终你会发现这些操作被放到mutation中,最后被hbase这些存储处理。
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 3, 2022 at 11:56 am