Starrocks 元数据管理的综述
- starrocks的元数据管理主要是依赖EditLog和checkpoint实现的。首先解释下什么是editlog editlog就是在操作数据的过程中每一步的操作日志,那操作日志长什么样的 大概是操作码 操作参数/数据。操作码对应每一个操作,比如创建数据库,删除数据等。那checkpoint又是什么checkoint就是将这些数据写入到磁盘。
- starrocks的catalog目前包含job信息,集群信息,数据库信息。
- 所有节点中只有master有权限进行editlog的写入和处理,非主节点会将元数据的请求转到master进行处理。
- master对editlog是如何处理的?editlog首先是写入到内存,当达到一定的量之后会借助bdb写入本地磁盘。然后同步给其它的节点,其它的节点借助bdb进行数据的回放,恢复到其它节点的内存中。
- 如果bdb中的数据不断增加,不进行清除那么数据就会无限膨胀,为了解决这个问题引入了checkpoint的机制定时对bdb中的数据进行的快照。
- 在写入bdb的时候每50000条数据就会roll到下一个database,database的名字是最小journalId。
- FE定时对bdb的数据生成全量的image,生成完成后通知follower对image进行拉取和加载。image生成后会对已经包含在image中的bdb的数据进行清理。
元数据写入和复制流程
元数据checkpoint流程:
checkpoint的流程主要是在com.starrocks.master.Checkpoint这个类当中的。
bdb 嵌入式数据库在元数据中的具体实现
bdbje journal的实现
主要类如下:
BDBEnvironment: bdb环境初始化
BDBJEJournal Journal实现对bdb的操作
BDBJournalCursor 访问journal的区间数据迭代指针
BDBHA HA协议的实现
BDBStateChangeListener 监听BDB节点状态的变化
BDBEncvironment 中主要实现了opendatabase removeDatabase
setup方法比较复杂,主要是实现bdb本地话的配置,包括--helper发现主节点,获取集群节点。以及注册stateListener监听状态的变化
public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
String helperHostPort, boolean isElectable) {
this.closing = false;
// Almost never used, just in case the master can not restart
if (Config.metadata_failure_recovery.equals("true")) {
if (!isElectable) {
LOG.error("Current node is not in the electable_nodes list. will exit");
System.exit(-1);
}
DbResetRepGroup resetUtility = new DbResetRepGroup(envHome, STARROCKS_JOURNAL_GROUP, selfNodeName,
selfNodeHostPort);
resetUtility.reset();
LOG.info("group has been reset.");
}
// set replication config
replicationConfig = new ReplicationConfig();
replicationConfig.setNodeName(selfNodeName);
replicationConfig.setNodeHostPort(selfNodeHostPort);
replicationConfig.setHelperHosts(helperHostPort);
replicationConfig.setGroupName(STARROCKS_JOURNAL_GROUP);
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10");
replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS);
replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
String.valueOf(Config.txn_rollback_limit));
replicationConfig
.setConfigParam(ReplicationConfig.REPLICA_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");
replicationConfig
.setConfigParam(ReplicationConfig.FEEDER_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");
if (isElectable) {
replicationConfig.setReplicaAckTimeout(Config.bdbje_replica_ack_timeout_second, TimeUnit.SECONDS);
replicationConfig.setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT, "0");
replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
} else {
replicationConfig.setNodeType(NodeType.SECONDARY);
replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
}
java.util.logging.Logger parent = java.util.logging.Logger.getLogger("com.sleepycat.je");
parent.setLevel(Level.parse(Config.bdbje_log_level));
// set environment config
environmentConfig = new EnvironmentConfig();
environmentConfig.setTransactional(true);
environmentConfig.setAllowCreate(true);
environmentConfig.setCachePercent(MEMORY_CACHE_PERCENT);
environmentConfig.setLockTimeout(Config.bdbje_lock_timeout_second, TimeUnit.SECONDS);
environmentConfig.setConfigParam(EnvironmentConfig.FILE_LOGGING_LEVEL, Config.bdbje_log_level);
if (isElectable) {
Durability durability = new Durability(getSyncPolicy(Config.master_sync_policy),
getSyncPolicy(Config.replica_sync_policy), getAckPolicy(Config.replica_ack_policy));
environmentConfig.setDurability(durability);
}
// set database config
dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
if (isElectable) {
dbConfig.setAllowCreate(true);
dbConfig.setReadOnly(false);
} else {
dbConfig.setAllowCreate(false);
dbConfig.setReadOnly(true);
}
// open environment and epochDB
for (int i = 0; i < RETRY_TIME; i++) {
try {
// open the environment
replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);
// get replicationGroupAdmin object.
Set<InetSocketAddress> adminNodes = new HashSet<InetSocketAddress>();
// 1. add helper node
HostAndPort helperAddress = HostAndPort.fromString(helperHostPort);
InetSocketAddress helper = new InetSocketAddress(helperAddress.getHost(),
helperAddress.getPort());
adminNodes.add(helper);
LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort);
// 2. add self if is electable
if (!selfNodeHostPort.equals(helperHostPort) && Catalog.getCurrentCatalog().isElectable()) {
HostAndPort selfNodeAddress = HostAndPort.fromString(selfNodeHostPort);
InetSocketAddress self = new InetSocketAddress(selfNodeAddress.getHost(),
selfNodeAddress.getPort());
adminNodes.add(self);
LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort);
}
replicationGroupAdmin = new ReplicationGroupAdmin(STARROCKS_JOURNAL_GROUP, adminNodes);
// get a BDBHA object and pass the reference to Catalog
HAProtocol protocol = new BDBHA(this, selfNodeName);
Catalog.getCurrentCatalog().setHaProtocol(protocol);
// start state change listener
StateChangeListener listener = new BDBStateChangeListener();
replicatedEnvironment.setStateChangeListener(listener);
// open epochDB. the first parameter null means auto-commit
epochDB = new CloseSafeDatabase(replicatedEnvironment.openDatabase(null, "epochDB", dbConfig));
break;
} catch (InsufficientLogException insufficientLogEx) {
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false); // delete obsolete log files.
// Use the members returned by insufficientLogEx.getLogProviders()
// to select the desired subset of members and pass the resulting
// list as the argument to config.setLogProviders(), if the
// default selection of providers is not suitable.
restore.execute(insufficientLogEx, config);
} catch (DatabaseException e) {
if (i < RETRY_TIME - 1) {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
LOG.error("error to open replicated environment. will exit.", e);
System.exit(-1);
}
}
}
}
其中bdb replication的相关文档在这 https://docs.oracle.com/cd/E17277_02/html/ReplicationGuide/BerkeleyDB-JE-Replication.pdf
BDBJournal 主要实现的是journal的接口:
// Open the journal environment
public void open();
// Roll Edit file or database
public void rollJournal();
// Get the newest journal id
public long getMaxJournalId();
// Get the oldest journal id
public long getMinJournalId();
// Close the environment
public void close();
// Get the journal which id = journalId
public JournalEntity read(long journalId);
// Get all the journals whose id: fromKey <= id <= toKey
// toKey = -1 means toKey = Long.Max_Value
public JournalCursor read(long fromKey, long toKey);
// Write a journal and sync to disk
public void write(short op, Writable writable);
// Delete journals whose max id is less than deleteToJournalId
public void deleteJournals(long deleteJournalToId);
// Current db's min journal id - 1
public long getFinalizedJournalId();
// Get all the dbs' name
public List<Long> getDatabaseNames();
注释写的非常明白其中重点需要看的是 读写方法
@Override
public synchronized void write(short op, Writable writable) {
JournalEntity entity = new JournalEntity();
entity.setOpCode(op);
entity.setData(writable);
// id is the key
long id = nextJournalId.getAndIncrement();
Long idLong = id;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
idBinding.objectToEntry(idLong, theKey);
// entity is the value
DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
try {
entity.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
DatabaseEntry theData = new DatabaseEntry(buffer.getData());
if (MetricRepo.isInit) {
MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
}
LOG.debug("opCode = {}, journal size = {}", op, theData.getSize());
// Write the key value pair to bdb.
boolean writeSuccessed = false;
try {
for (int i = 0; i < RETRY_TIME; i++) {
try {
// Parameter null means auto commit
if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
writeSuccessed = true;
LOG.debug("master write journal {} finished. db name {}, current time {}",
id, currentJournalDB.getDb().getDatabaseName(), System.currentTimeMillis());
break;
}
} catch (DatabaseException e) {
LOG.error("catch an exception when writing to database. sleep and retry. journal id {}", id, e);
try {
this.wait(5 * 1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
} finally {
// If write failed, set nextJournalId to the origin value.
if (!writeSuccessed) {
nextJournalId.set(id);
}
}
if (!writeSuccessed) {
if (op == OperationType.OP_TIMESTAMP) {
/*
* Do not exit if the write operation is OP_TIMESTAMP.
* If all the followers exit except master, master should continue provide query service.
* To prevent master exit, we should exempt OP_TIMESTAMP write
*/
LOG.warn("master can not achieve quorum. write timestamp fail. but will not exit.");
return;
}
String msg = "write bdb failed. will exit. journalId: " + id + ", bdb database Name: " +
currentJournalDB.getDb().getDatabaseName();
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
}
}
@Override
public JournalCursor read(long fromKey, long toKey) {
return BDBJournalCursor.getJournalCursor(bdbEnvironment, fromKey, toKey);
}
写入都是通过JournalEntity来实现的 JournalEntity 包含opcode 和Writable 对象。
opcode 其实就是操作类型码 具体有哪些类型码可以在com.starrocks.persist.OperationType类中看到
writable其实就是写入相应操作的信息。
看个修改表名的具体例子就明白了:
public void renameTable(Database db, OlapTable table, TableRenameClause tableRenameClause) throws DdlException {
if (table.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "] is under " + table.getState());
}
String oldTableName = table.getName();
String newTableName = tableRenameClause.getNewTableName();
if (oldTableName.equals(newTableName)) {
throw new DdlException("Same table name");
}
// check if name is already used
if (db.getTable(newTableName) != null) {
throw new DdlException("Table name[" + newTableName + "] is already used");
}
table.checkAndSetName(newTableName, false);
db.dropTable(oldTableName);
db.createTable(table);
TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName);
editLog.logTableRename(tableInfo);
LOG.info("rename table[{}] to {}, tableId: {}", oldTableName, newTableName, table.getId());
}
rename table 最后调用了editLog的logTableRename 修改元数据
logTableRename的实现:
public void logTableRename(TableInfo tableInfo) {
logEdit(OperationType.OP_RENAME_TABLE, tableInfo);
}
通过logEdit实现的传进去的是opcode 和tableInfo对象.
public class TableInfo implements Writable {
private long dbId;
private long tableId;
private long indexId;
private long partitionId;
private String newTableName;
private String newRollupName;
private String newPartitionName;
xxxx
}
read方法的实现其实也类似:
从数据中读出相应的操作码和操作数
根据操作反序列化成相应的对象
BDBJournalCursor实现的接口 其实就是个迭代器获取结果
// This class is like JDBC ResultSet.
public interface JournalCursor {
// Return the next journal. return null when there is no more journals
public JournalEntity next();
public void close();
}
元数据高可用的实现
BDBStateListener 中启动的监听对象:
public synchronized void stateChange(StateChangeEvent sce) throws RuntimeException {
FrontendNodeType newType = null;
switch (sce.getState()) {
case MASTER: {
newType = FrontendNodeType.MASTER;
break;
}
case REPLICA: {
if (Catalog.getCurrentCatalog().isElectable()) {
newType = FrontendNodeType.FOLLOWER;
} else {
newType = FrontendNodeType.OBSERVER;
}
break;
}
case UNKNOWN: {
newType = FrontendNodeType.UNKNOWN;
break;
}
default: {
String msg = "this node is " + sce.getState().name();
LOG.warn(msg);
Util.stdoutWithTime(msg);
return;
}
}
Preconditions.checkNotNull(newType);
Catalog.getCurrentCatalog().notifyNewFETypeTransfer(newType);
}
最重要的是角色改变通知这个方法:
public void notifyNewFETypeTransfer(FrontendNodeType newType) {
try {
String msg = "notify new FE type transfer: " + newType;
LOG.warn(msg);
Util.stdoutWithTime(msg);
this.typeTransferQueue.put(newType);
} catch (InterruptedException e) {
LOG.error("failed to put new FE type: {}", newType, e);
}
}
会在typeTransferQueue 这个队列中放入相应的状态,catalog初始化的时候会创建线程监听该队列
public void createStateListener() {
listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) {
@Override
protected synchronized void runOneCycle() {
while (true) {
FrontendNodeType newType = null;
try {
newType = typeTransferQueue.take();
} catch (InterruptedException e) {
LOG.error("got exception when take FE type from queue", e);
Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage());
System.exit(-1);
}
Preconditions.checkNotNull(newType);
LOG.info("begin to transfer FE type from {} to {}", feType, newType);
if (feType == newType) {
return;
}
/*
* INIT -> MASTER: transferToMaster
* INIT -> FOLLOWER/OBSERVER: transferToNonMaster
* UNKNOWN -> MASTER: transferToMaster
* UNKNOWN -> FOLLOWER/OBSERVER: transferToNonMaster
* FOLLOWER -> MASTER: transferToMaster
* FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false
*/
switch (feType) {
case INIT: {
switch (newType) {
case MASTER: {
transferToMaster(feType);
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
case UNKNOWN:
break;
default:
break;
}
break;
}
case UNKNOWN: {
switch (newType) {
case MASTER: {
transferToMaster(feType);
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case FOLLOWER: {
switch (newType) {
case MASTER: {
transferToMaster(feType);
break;
}
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case OBSERVER: {
if (newType == FrontendNodeType.UNKNOWN) {
transferToNonMaster(newType);
}
break;
}
case MASTER: {
// exit if master changed to any other type
String msg = "transfer FE type from MASTER to " + newType.name() + ". exit";
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
}
default:
break;
} // end switch formerFeType
feType = newType;
LOG.info("finished to transfer FE type to {}", feType);
}
} // end runOneCycle
};
listener.setMetaContext(metaContext);
}
当队列的状态发生改变的时候进行相应的操作: transfertoMaster transfertoNonMaster
transfertoMaster:
- 停止replayer
- open and roll editlog
- startMasterOnlyDaemonThreads and startNonMasterDaemonThreads
private void transferToMaster(FrontendNodeType oldType) {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
}
replayer = null;
}
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
editLog.open();
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
checkCurrentNodeExist();
editLog.rollEditLog();
// Set the feType to MASTER before writing edit log, because the feType must be Master when writing edit log.
// It will be set to the old type if any error happens in the following procedure
feType = FrontendNodeType.MASTER;
try {
// Log meta_version
int communityMetaVersion = MetaContext.get().getMetaVersion();
int starrocksMetaVersion = MetaContext.get().getStarRocksMetaVersion();
if (communityMetaVersion < FeConstants.meta_version ||
starrocksMetaVersion < FeConstants.starrocks_meta_version) {
editLog.logMetaVersion(new MetaVersion(FeConstants.meta_version, FeConstants.starrocks_meta_version));
MetaContext.get().setMetaVersion(FeConstants.meta_version);
MetaContext.get().setStarRocksMetaVersion(FeConstants.starrocks_meta_version);
}
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
}
if (!isDefaultClusterCreated) {
initDefaultCluster();
}
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
this.masterIp = FrontendOptions.getLocalHostAddress();
this.masterRpcPort = Config.rpc_port;
this.masterHttpPort = Config.http_port;
MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort);
editLog.logMasterInfo(info);
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
MetricRepo.init();
canRead.set(true);
isReady.set(true);
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
} catch (Throwable t) {
LOG.warn("transfer to master failed with error", t);
feType = oldType;
throw t;
}
}
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 1, 2022 at 03:31 pm