hugegraph支持多种存储包括mysql,postgresql,rockdb 等。那么他是怎么写入这些存储当中的呢。
其实前边我们有提到图的点边schema都会映射成BackendEntry。表的模型HugeType + BackendColumn
在之前的事务中我们回顾下prepareAddition
protected void prepareAdditions(Map<Id, HugeVertex> addedVertices,
Map<Id, HugeEdge> addedEdges) {
if (this.checkCustomVertexExist) {
this.checkVertexExistIfCustomizedId(addedVertices);
}
// Do vertex update
for (HugeVertex v : addedVertices.values()) {
assert !v.removed();
v.committed();
this.checkAggregateProperty(v);
// Check whether passed all non-null properties
if (!this.graphMode().loading()) {
this.checkNonnullProperty(v);
}
// Add vertex entry
this.doInsert(this.serializer.writeVertex(v));
// Update index of vertex(only include props)
this.indexTx.updateVertexIndex(v, false);
this.indexTx.updateLabelIndex(v, false);
}
// Do edge update
for (HugeEdge e : addedEdges.values()) {
assert !e.removed();
e.committed();
// Skip edge if its owner has been removed
if (this.removingEdgeOwner(e)) {
continue;
}
this.checkAggregateProperty(e);
// Add edge entry of OUT and IN
this.doInsert(this.serializer.writeEdge(e));
this.doInsert(this.serializer.writeEdge(e.switchOwner()));
// Update index of edge
this.indexTx.updateEdgeIndex(e, false);
this.indexTx.updateLabelIndex(e, false);
}
}
我们看到doInsert 中首先对数据进行了序列化.我们看到这里其实用的AbstractSerializer 。这个类有很多实现类。
我们看看这个类实现了vertex edge 和schema 的序列化
public interface GraphSerializer {
public BackendEntry writeVertex(HugeVertex vertex);
public BackendEntry writeVertexProperty(HugeVertexProperty<?> prop);
public HugeVertex readVertex(HugeGraph graph, BackendEntry entry);
public BackendEntry writeEdge(HugeEdge edge);
public BackendEntry writeEdgeProperty(HugeEdgeProperty<?> prop);
public HugeEdge readEdge(HugeGraph graph, BackendEntry entry);
public BackendEntry writeIndex(HugeIndex index);
public HugeIndex readIndex(HugeGraph graph, ConditionQuery query,
BackendEntry entry);
public BackendEntry writeId(HugeType type, Id id);
public Query writeQuery(Query query);
}
public interface SchemaSerializer {
public BackendEntry writeVertexLabel(VertexLabel vertexLabel);
public VertexLabel readVertexLabel(HugeGraph graph, BackendEntry entry);
public BackendEntry writeEdgeLabel(EdgeLabel edgeLabel);
public EdgeLabel readEdgeLabel(HugeGraph graph, BackendEntry entry);
public BackendEntry writePropertyKey(PropertyKey propertyKey);
public PropertyKey readPropertyKey(HugeGraph graph, BackendEntry entry);
public BackendEntry writeIndexLabel(IndexLabel indexLabel);
public IndexLabel readIndexLabel(HugeGraph graph, BackendEntry entry);
}
我们看这个AbstractSerializer 具体的一个实现类。BinarySerializer
public BackendEntry writeVertex(HugeVertex vertex) {
BinaryBackendEntry entry = newBackendEntry(vertex);
if (vertex.removed()) {
return entry;
}
int propsCount = vertex.getProperties().size();
BytesBuffer buffer = BytesBuffer.allocate(8 + 16 * propsCount);
// Write vertex label
buffer.writeId(vertex.schemaLabel().id());
// Write all properties of the vertex
this.formatProperties(vertex.getProperties().values(), buffer);
// Write vertex expired time if needed
if (vertex.hasTtl()) {
entry.ttl(vertex.ttl());
this.formatExpiredTime(vertex.expiredTime(), buffer);
}
// Fill column
byte[] name = this.keyWithIdPrefix ? entry.id().asBytes() : EMPTY_BYTES;
entry.column(name, buffer.bytes());
return entry;
}
可以看到他序列化成二进制的时候是
label_id(1B)
prop_total_szie(属性个数总数)
prop_schema_id+prop_val()(2B)
如果是hbase的话还有个rowkey
如果是TableSerializer那么
public BackendEntry writeVertex(HugeVertex vertex) {
TableBackendEntry entry = newBackendEntry(vertex);
if (vertex.hasTtl()) {
entry.ttl(vertex.ttl());
entry.column(HugeKeys.EXPIRED_TIME, vertex.expiredTime());
}
entry.column(HugeKeys.ID, this.writeId(vertex.id()));
entry.column(HugeKeys.LABEL, vertex.schemaLabel().id().asLong());
// Add all properties of a Vertex
this.formatProperties(vertex, entry.row());
return entry;
}
mysql实现
protected void formatProperties(HugeElement element,
TableBackendEntry.Row row) {
Map<Number, Object> properties = new HashMap<>();
// Add all properties of a Vertex
for (HugeProperty<?> prop : element.getProperties().values()) {
Number key = prop.propertyKey().id().asLong();
Object val = prop.value();
properties.put(key, val);
}
row.column(HugeKeys.PROPERTIES, JsonUtil.toJson(properties));
}
可以看到返回的是backentry,主要是的id,label,entry_row 的是props 和json string.
真正提交的时候是entry 加入mute中。最终mysql存储执行
private void mutate(Session session, BackendAction item) {
MysqlBackendEntry entry = castBackendEntry(item.entry());
MysqlTable table = this.table(entry.type());
switch (item.action()) {
case INSERT:
table.insert(session, entry.row());
break;
case DELETE:
table.delete(session, entry.row());
break;
case APPEND:
table.append(session, entry.row());
break;
case ELIMINATE:
table.eliminate(session, entry.row());
break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
}
}
public void insert(Session session, MysqlBackendEntry.Row entry) {
String template = this.buildInsertTemplate(entry);
PreparedStatement insertStmt;
try {
// Create or get insert prepare statement
insertStmt = session.prepareStatement(template);
int i = 1;
for (Object object : this.buildInsertObjects(entry)) {
insertStmt.setObject(i++, object);
}
} catch (SQLException e) {
throw new BackendException("Failed to prepare statement '%s'" +
"for entry: %s", template, entry);
}
session.add(insertStmt);
}
其实是将entry 转换成mysql 语句的操作
@Override
public void insert(Session session, BackendEntry entry) {
assert !entry.columns().isEmpty();
session.put(this.table(), CF, entry.id().asBytes(), entry.columns());
}
hbase则更简单了将其直接一个put 搞定
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 3, 2022 at 11:58 am