hugegraph之数据存储模型(五)
in with 0 comment

hugegraph之数据存储模型(五)

in with 0 comment

hugegraph支持多种存储包括mysql,postgresql,rockdb 等。那么他是怎么写入这些存储当中的呢。
其实前边我们有提到图的点边schema都会映射成BackendEntry。表的模型HugeType + BackendColumn

在之前的事务中我们回顾下prepareAddition

protected void prepareAdditions(Map<Id, HugeVertex> addedVertices,
                                    Map<Id, HugeEdge> addedEdges) {
        if (this.checkCustomVertexExist) {
            this.checkVertexExistIfCustomizedId(addedVertices);
        }
        // Do vertex update
        for (HugeVertex v : addedVertices.values()) {
            assert !v.removed();
            v.committed();
            this.checkAggregateProperty(v);
            // Check whether passed all non-null properties
            if (!this.graphMode().loading()) {
                this.checkNonnullProperty(v);
            }

            // Add vertex entry
            this.doInsert(this.serializer.writeVertex(v));
            // Update index of vertex(only include props)
            this.indexTx.updateVertexIndex(v, false);
            this.indexTx.updateLabelIndex(v, false);
        }

        // Do edge update
        for (HugeEdge e : addedEdges.values()) {
            assert !e.removed();
            e.committed();
            // Skip edge if its owner has been removed
            if (this.removingEdgeOwner(e)) {
                continue;
            }
            this.checkAggregateProperty(e);
            // Add edge entry of OUT and IN
            this.doInsert(this.serializer.writeEdge(e));
            this.doInsert(this.serializer.writeEdge(e.switchOwner()));
            // Update index of edge
            this.indexTx.updateEdgeIndex(e, false);
            this.indexTx.updateLabelIndex(e, false);
        }
    }

我们看到doInsert 中首先对数据进行了序列化.我们看到这里其实用的AbstractSerializer 。这个类有很多实现类。
我们看看这个类实现了vertex edge 和schema 的序列化

public interface GraphSerializer {

    public BackendEntry writeVertex(HugeVertex vertex);
    public BackendEntry writeVertexProperty(HugeVertexProperty<?> prop);
    public HugeVertex readVertex(HugeGraph graph, BackendEntry entry);

    public BackendEntry writeEdge(HugeEdge edge);
    public BackendEntry writeEdgeProperty(HugeEdgeProperty<?> prop);
    public HugeEdge readEdge(HugeGraph graph, BackendEntry entry);

    public BackendEntry writeIndex(HugeIndex index);
    public HugeIndex readIndex(HugeGraph graph, ConditionQuery query,
                               BackendEntry entry);

    public BackendEntry writeId(HugeType type, Id id);
    public Query writeQuery(Query query);
}
public interface SchemaSerializer {

    public BackendEntry writeVertexLabel(VertexLabel vertexLabel);
    public VertexLabel readVertexLabel(HugeGraph graph, BackendEntry entry);

    public BackendEntry writeEdgeLabel(EdgeLabel edgeLabel);
    public EdgeLabel readEdgeLabel(HugeGraph graph, BackendEntry entry);

    public BackendEntry writePropertyKey(PropertyKey propertyKey);
    public PropertyKey readPropertyKey(HugeGraph graph, BackendEntry entry);

    public BackendEntry writeIndexLabel(IndexLabel indexLabel);
    public IndexLabel readIndexLabel(HugeGraph graph, BackendEntry entry);
}

我们看这个AbstractSerializer 具体的一个实现类。BinarySerializer

 public BackendEntry writeVertex(HugeVertex vertex) {
        BinaryBackendEntry entry = newBackendEntry(vertex);

        if (vertex.removed()) {
            return entry;
        }

        int propsCount = vertex.getProperties().size();
        BytesBuffer buffer = BytesBuffer.allocate(8 + 16 * propsCount);

        // Write vertex label
        buffer.writeId(vertex.schemaLabel().id());

        // Write all properties of the vertex
        this.formatProperties(vertex.getProperties().values(), buffer);

        // Write vertex expired time if needed
        if (vertex.hasTtl()) {
            entry.ttl(vertex.ttl());
            this.formatExpiredTime(vertex.expiredTime(), buffer);
        }

        // Fill column
        byte[] name = this.keyWithIdPrefix ? entry.id().asBytes() : EMPTY_BYTES;
        entry.column(name, buffer.bytes());

        return entry;
    }

可以看到他序列化成二进制的时候是
label_id(1B)
prop_total_szie(属性个数总数)
prop_schema_id+prop_val()(2B)
如果是hbase的话还有个rowkey
如果是TableSerializer那么

public BackendEntry writeVertex(HugeVertex vertex) {
        TableBackendEntry entry = newBackendEntry(vertex);
        if (vertex.hasTtl()) {
            entry.ttl(vertex.ttl());
            entry.column(HugeKeys.EXPIRED_TIME, vertex.expiredTime());
        }
        entry.column(HugeKeys.ID, this.writeId(vertex.id()));
        entry.column(HugeKeys.LABEL, vertex.schemaLabel().id().asLong());
        // Add all properties of a Vertex
        this.formatProperties(vertex, entry.row());
        return entry;
    }
    mysql实现
     protected void formatProperties(HugeElement element,
                                    TableBackendEntry.Row row) {
        Map<Number, Object> properties = new HashMap<>();
        // Add all properties of a Vertex
        for (HugeProperty<?> prop : element.getProperties().values()) {
            Number key = prop.propertyKey().id().asLong();
            Object val = prop.value();
            properties.put(key, val);
        }
        row.column(HugeKeys.PROPERTIES, JsonUtil.toJson(properties));
    }

可以看到返回的是backentry,主要是的id,label,entry_row 的是props 和json string.
真正提交的时候是entry 加入mute中。最终mysql存储执行

  private void mutate(Session session, BackendAction item) {
        MysqlBackendEntry entry = castBackendEntry(item.entry());
        MysqlTable table = this.table(entry.type());

        switch (item.action()) {
            case INSERT:
                table.insert(session, entry.row());
                break;
            case DELETE:
                table.delete(session, entry.row());
                break;
            case APPEND:
                table.append(session, entry.row());
                break;
            case ELIMINATE:
                table.eliminate(session, entry.row());
                break;
            default:
                throw new AssertionError(String.format(
                          "Unsupported mutate action: %s", item.action()));
        }
    }
    public void insert(Session session, MysqlBackendEntry.Row entry) {
        String template = this.buildInsertTemplate(entry);

        PreparedStatement insertStmt;
        try {
            // Create or get insert prepare statement
            insertStmt = session.prepareStatement(template);
            int i = 1;
            for (Object object : this.buildInsertObjects(entry)) {
                insertStmt.setObject(i++, object);
            }
        } catch (SQLException e) {
            throw new BackendException("Failed to prepare statement '%s'" +
                                       "for entry: %s", template, entry);
        }
        session.add(insertStmt);
    }

其实是将entry 转换成mysql 语句的操作

@Override
    public void insert(Session session, BackendEntry entry) {
        assert !entry.columns().isEmpty();
        session.put(this.table(), CF, entry.id().asBytes(), entry.columns());
    }

hbase则更简单了将其直接一个put 搞定

Responses