hugegraph的启动主要是在HugeGraphServer 这个函数中
public HugeGraphServer(String gremlinServerConf, String restServerConf)
throws Exception {
// Only switch on security manager after HugeGremlinServer started
SecurityManager securityManager = System.getSecurityManager();
System.setSecurityManager(null);
try {
// Start GremlinServer
this.gremlinServer = HugeGremlinServer.start(gremlinServerConf); //启动gremlin
} catch (Throwable e) {
LOG.error("HugeGremlinServer start error: ", e);
HugeFactory.shutdown(30L);
throw e;
} finally {
System.setSecurityManager(securityManager);
}
try {
// Start HugeRestServer
this.restServer = HugeRestServer.start(restServerConf); //启动rest 服务器
} catch (Throwable e) {
LOG.error("HugeRestServer start error: ", e);
try {
this.gremlinServer.stop().get();
} catch (Throwable t) {
LOG.error("GremlinServer stop error: ", t);
}
HugeFactory.shutdown(30L);
throw e;
}
}
跟踪HugeGremlinServerhe和 rest 的启动方法
HugeGremlinServer: start->startWithInjectTraversal
settings = Settings.read(conf); //读取配置
ContextGremlinServer server = new ContextGremlinServer(settings); //根据配置启动context
// Inject customized traversal source
server.injectTraversalSource(G_PREFIX); //
server.start().exceptionally(t -> {
LOG.error("Gremlin Server was unable to start and will " +
"shutdown now: {}", t.getMessage());
server.stop().join();
throw new HugeException("Failed to start Gremlin Server");
}).join(); //启动相应的线程加入主线程等待
仔细看下ContextGremlinServer的几个方法:
public ContextGremlinServer(final Settings settings) {
/*
* pass custom Executor https://github.com/apache/tinkerpop/pull/813
*/
super(settings, newGremlinExecutorService(settings)); //构造线程池
}
public void injectAuthGraph() {
HugeGraphAuthProxy.setContext(Context.admin());
GraphManager manager = this.getServerGremlinExecutor()
.getGraphManager();
for (String name : manager.getGraphNames()) {
Graph graph = manager.getGraph(name);
graph = new HugeGraphAuthProxy((HugeGraph) graph);
manager.putGraph(name, graph);
}
}
public void injectTraversalSource(String prefix) {
GraphManager manager = this.getServerGremlinExecutor()
.getGraphManager();
for (String graph : manager.getGraphNames()) {
GraphTraversalSource g = manager.getGraph(graph).traversal(); //开启配置图的GraphTraversalSource
String gName = prefix + graph;
if (manager.getTraversalSource(gName) != null) {
throw new HugeException(
"Found existing name '%s' in global bindings, " +
"it may lead to gremlin query error.", gName);
}
// Add a traversal source for all graphs with customed rule.
manager.putTraversalSource(gName, g); //将source 加入到manager的缓存中
}
}
static ExecutorService newGremlinExecutorService(Settings settings) { //构造服务的线程池
if (settings.gremlinPool == 0) {
settings.gremlinPool = Runtime.getRuntime().availableProcessors();
}
int size = settings.gremlinPool;
ThreadFactory factory = ThreadFactoryUtil.create("exec-%d");
return new ContextThreadPoolExecutor(size, size, factory);
}
到这我们就把gremlinServer的启动搞定了。继续跟进rest-server
RestServer
RegisterUtil.registerServer();//注册rest-server //通过配置class 反射有哪些的配置选项
// Start RestServer
return RestServer.start(conf);//启动htt服务
RestServer->start->configHttpServer->启动GrizzlyHttpServer
可以看到hugegraph 采用的是glassfishserver
到这里我们就基本讲完了server的启动。我们来看看通过api创建点和边的过程是怎样的。
api构建图
public String create(@Context GraphManager manager,
@PathParam("graph") String graph,
JsonVertex jsonVertex,
@Context Request re) {
LOG.trace("Graph [{}] create vertex: {}", graph, jsonVertex);
checkCreatingBody(jsonVertex); //检查是否json
LOG.debug("Rest-POST-create: URL={}, Query={}, Addr={}, User={}",
re.getRequestURL(), re.getQueryString(), re.getRemoteAddr(),
re.getRemoteUser());
HugeGraph g = graph(manager, graph); //构造graph对象
Vertex vertex = commit(g, () -> g.addVertex(jsonVertex.properties())); //提交添加点的请求
return manager.serializer(g).writeVertex(vertex); //mananger序列化写入
}
重点看下graph的构建
主要是通过grpahmanager获取相应的对象。那graphmanager是怎么生成的呢、
this.graphs = new ConcurrentHashMap<>();
this.authenticator = HugeAuthenticator.loadAuthenticator(conf);
this.loadGraphs(conf.getMap(ServerOptions.GRAPHS));
// this.installLicense(conf, "");
// Raft will load snapshot firstly then launch election and replay log
this.waitGraphsStarted();
this.checkBackendVersionOrExit();
this.serverStarted(conf);
this.addMetrics(conf);
重点在于loadGraphs 继续跟踪看到
final Graph graph = GraphFactory.open(path);
通过工厂模式生成Graph
这个GraphFactory 是通过配置的com.baidu.hugegraph.HugeFactory
具体看这个open方法干了些啥事
String name = config.get(CoreOptions.STORE);
checkGraphName(name, "graph config(like hugegraph.properties)");
name = name.toLowerCase();
HugeGraph graph = graphs.get(name);
if (graph == null || graph.closed()) {
graph = new StandardHugeGraph(config);
graphs.put(name, graph);
} else {
String backend = config.get(CoreOptions.BACKEND);
E.checkState(backend.equalsIgnoreCase(graph.backend()),
"Graph name '%s' has been used by backend '%s'",
name, graph.backend());
}
可以看到构建了StandardHugeGraph的对象并把他放入到map中,那再看看StandardHugeGraph 是怎么生成的。
public StandardHugeGraph(HugeConfig config) {
this.params = new StandardHugeGraphParams();
this.configuration = config;
this.schemaEventHub = new EventHub("schema");
this.graphEventHub = new EventHub("graph");
this.indexEventHub = new EventHub("index");
final int writeLimit = config.get(CoreOptions.RATE_LIMIT_WRITE);
this.writeRateLimiter = writeLimit > 0 ?
RateLimiter.create(writeLimit) : null;
final int readLimit = config.get(CoreOptions.RATE_LIMIT_READ);
this.readRateLimiter = readLimit > 0 ?
RateLimiter.create(readLimit) : null;
boolean ramtableEnable = config.get(CoreOptions.QUERY_RAMTABLE_ENABLE);
if (ramtableEnable) {
long vc = config.get(CoreOptions.QUERY_RAMTABLE_VERTICES_CAPACITY);
int ec = config.get(CoreOptions.QUERY_RAMTABLE_EDGES_CAPACITY);
this.ramtable = new RamTable(this, vc, ec);
} else {
this.ramtable = null;
}
this.taskManager = TaskManager.instance();
this.features = new HugeFeatures(this, true);
this.name = config.get(CoreOptions.STORE);
this.started = false;
this.closed = false;
this.mode = GraphMode.NONE;
LockUtil.init(this.name);
try {
this.storeProvider = this.loadStoreProvider();
} catch (BackendException e) {
LockUtil.destroy(this.name);
String message = "Failed to load backend store provider";
LOG.error("{}: {}", message, e.getMessage());
throw new HugeException(message);
}
this.tx = new TinkerPopTransaction(this);
SnowflakeIdGenerator.init(this.params);
this.taskManager.addScheduler(this.params);
this.userManager = new StandardUserManager(this.params);
this.variables = null;
}
这里边进行了很多参数的初始化,不过主要是两个方法
this.storeProvider = this.loadStoreProvider();
this.tx = new TinkerPopTransaction(this);
private BackendStoreProvider loadStoreProvider() {
return BackendProviderFactory.open(this.params);
}
public static BackendStoreProvider open(HugeGraphParams params) {
HugeConfig config = params.configuration();
String backend = config.get(CoreOptions.BACKEND).toLowerCase(); //获取backend参数
String graph = config.get(CoreOptions.STORE); //获取存储的参数
boolean raftMode = config.get(CoreOptions.RAFT_MODE);
BackendStoreProvider provider = newProvider(config); //
if (raftMode) {
LOG.info("Opening backend store '{}' in raft mode for graph '{}'",
backend, graph);
provider = new RaftBackendStoreProvider(provider, params);
}
provider.open(graph);
return provider;
}
private static BackendStoreProvider newProvider(HugeConfig config) {
String backend = config.get(CoreOptions.BACKEND).toLowerCase();
String graph = config.get(CoreOptions.STORE);
if (InMemoryDBStoreProvider.matchType(backend)) {
return InMemoryDBStoreProvider.instance(graph);
}
Class<? extends BackendStoreProvider> clazz = providers.get(backend);
BackendException.check(clazz != null,
"Not exists BackendStoreProvider: %s", backend);
assert BackendStoreProvider.class.isAssignableFrom(clazz);
BackendStoreProvider instance = null;
try {
instance = clazz.newInstance();
} catch (Exception e) {
throw new BackendException(e);
}
BackendException.check(backend.equals(instance.type()),
"BackendStoreProvider with type '%s' " +
"can't be opened by key '%s'",
instance.type(), backend);
return instance;
}
通过配置的参数将构造生成的backendprovider对象在registerUtil里我们可以看到支持的provider
比如我们用的hbase
public static void registerHBase() {
// Register config
OptionSpace.register("hbase",
"com.baidu.hugegraph.backend.store.hbase.HbaseOptions");
// Register serializer
SerializerFactory.register("hbase",
"com.baidu.hugegraph.backend.store.hbase.HbaseSerializer");
// Register backend
BackendProviderFactory.register("hbase",
"com.baidu.hugegraph.backend.store.hbase.HbaseStoreProvider");
}
配置里配置的HbaseStoreProvider 那么我们提供的就是HbaseStoreProvider 对象
当然这里还有个raft模式后续我们再看是什么情况用raft协议的存储。
再来看看事务其实只是new 了个对象具体使用的情况下节再分解。
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 3, 2022 at 12:05 pm