starrocks 元数据管理
in bigdata with 0 comment

starrocks 元数据管理

in bigdata with 0 comment

Starrocks 元数据管理的综述

  1. starrocks的元数据管理主要是依赖EditLog和checkpoint实现的。首先解释下什么是editlog editlog就是在操作数据的过程中每一步的操作日志,那操作日志长什么样的 大概是操作码 操作参数/数据。操作码对应每一个操作,比如创建数据库,删除数据等。那checkpoint又是什么checkoint就是将这些数据写入到磁盘。
  2. starrocks的catalog目前包含job信息,集群信息,数据库信息。
  3. 所有节点中只有master有权限进行editlog的写入和处理,非主节点会将元数据的请求转到master进行处理。
  4. master对editlog是如何处理的?editlog首先是写入到内存,当达到一定的量之后会借助bdb写入本地磁盘。然后同步给其它的节点,其它的节点借助bdb进行数据的回放,恢复到其它节点的内存中。
  5. 如果bdb中的数据不断增加,不进行清除那么数据就会无限膨胀,为了解决这个问题引入了checkpoint的机制定时对bdb中的数据进行的快照。
  6. 在写入bdb的时候每50000条数据就会roll到下一个database,database的名字是最小journalId。
  7. FE定时对bdb的数据生成全量的image,生成完成后通知follower对image进行拉取和加载。image生成后会对已经包含在image中的bdb的数据进行清理。

元数据写入和复制流程

元数据写入.png

元数据checkpoint流程:

checkpoint (1).png

checkpoint的流程主要是在com.starrocks.master.Checkpoint这个类当中的。

bdb 嵌入式数据库在元数据中的具体实现

bdbje journal的实现

主要类如下:
BDBEnvironment: bdb环境初始化
BDBJEJournal Journal实现对bdb的操作
BDBJournalCursor 访问journal的区间数据迭代指针
BDBHA HA协议的实现
BDBStateChangeListener 监听BDB节点状态的变化

BDBEncvironment 中主要实现了opendatabase removeDatabase
setup方法比较复杂,主要是实现bdb本地话的配置,包括--helper发现主节点,获取集群节点。以及注册stateListener监听状态的变化

 public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
                      String helperHostPort, boolean isElectable) {

        this.closing = false;

        // Almost never used, just in case the master can not restart
        if (Config.metadata_failure_recovery.equals("true")) {
            if (!isElectable) {
                LOG.error("Current node is not in the electable_nodes list. will exit");
                System.exit(-1);
            }
            DbResetRepGroup resetUtility = new DbResetRepGroup(envHome, STARROCKS_JOURNAL_GROUP, selfNodeName,
                    selfNodeHostPort);
            resetUtility.reset();
            LOG.info("group has been reset.");
        }

        // set replication config
        replicationConfig = new ReplicationConfig();
        replicationConfig.setNodeName(selfNodeName);
        replicationConfig.setNodeHostPort(selfNodeHostPort);
        replicationConfig.setHelperHosts(helperHostPort);
        replicationConfig.setGroupName(STARROCKS_JOURNAL_GROUP);
        replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10");
        replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS);
        replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
                String.valueOf(Config.txn_rollback_limit));
        replicationConfig
                .setConfigParam(ReplicationConfig.REPLICA_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");
        replicationConfig
                .setConfigParam(ReplicationConfig.FEEDER_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");

        if (isElectable) {
            replicationConfig.setReplicaAckTimeout(Config.bdbje_replica_ack_timeout_second, TimeUnit.SECONDS);
            replicationConfig.setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT, "0");
            replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
        } else {
            replicationConfig.setNodeType(NodeType.SECONDARY);
            replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
        }

        java.util.logging.Logger parent = java.util.logging.Logger.getLogger("com.sleepycat.je");
        parent.setLevel(Level.parse(Config.bdbje_log_level));

        // set environment config
        environmentConfig = new EnvironmentConfig();
        environmentConfig.setTransactional(true);
        environmentConfig.setAllowCreate(true);
        environmentConfig.setCachePercent(MEMORY_CACHE_PERCENT);
        environmentConfig.setLockTimeout(Config.bdbje_lock_timeout_second, TimeUnit.SECONDS);
        environmentConfig.setConfigParam(EnvironmentConfig.FILE_LOGGING_LEVEL, Config.bdbje_log_level);
        if (isElectable) {
            Durability durability = new Durability(getSyncPolicy(Config.master_sync_policy),
                    getSyncPolicy(Config.replica_sync_policy), getAckPolicy(Config.replica_ack_policy));
            environmentConfig.setDurability(durability);
        }

        // set database config
        dbConfig = new DatabaseConfig();
        dbConfig.setTransactional(true);
        if (isElectable) {
            dbConfig.setAllowCreate(true);
            dbConfig.setReadOnly(false);
        } else {
            dbConfig.setAllowCreate(false);
            dbConfig.setReadOnly(true);
        }

        // open environment and epochDB
        for (int i = 0; i < RETRY_TIME; i++) {
            try {
                // open the environment
                replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);

                // get replicationGroupAdmin object.
                Set<InetSocketAddress> adminNodes = new HashSet<InetSocketAddress>();
                // 1. add helper node
                HostAndPort helperAddress = HostAndPort.fromString(helperHostPort);
                InetSocketAddress helper = new InetSocketAddress(helperAddress.getHost(),
                        helperAddress.getPort());
                adminNodes.add(helper);
                LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort);
                // 2. add self if is electable
                if (!selfNodeHostPort.equals(helperHostPort) && Catalog.getCurrentCatalog().isElectable()) {
                    HostAndPort selfNodeAddress = HostAndPort.fromString(selfNodeHostPort);
                    InetSocketAddress self = new InetSocketAddress(selfNodeAddress.getHost(),
                            selfNodeAddress.getPort());
                    adminNodes.add(self);
                    LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort);
                }

                replicationGroupAdmin = new ReplicationGroupAdmin(STARROCKS_JOURNAL_GROUP, adminNodes);

                // get a BDBHA object and pass the reference to Catalog
                HAProtocol protocol = new BDBHA(this, selfNodeName);
                Catalog.getCurrentCatalog().setHaProtocol(protocol);

                // start state change listener
                StateChangeListener listener = new BDBStateChangeListener();
                replicatedEnvironment.setStateChangeListener(listener);

                // open epochDB. the first parameter null means auto-commit
                epochDB = new CloseSafeDatabase(replicatedEnvironment.openDatabase(null, "epochDB", dbConfig));
                break;
            } catch (InsufficientLogException insufficientLogEx) {
                NetworkRestore restore = new NetworkRestore();
                NetworkRestoreConfig config = new NetworkRestoreConfig();
                config.setRetainLogFiles(false); // delete obsolete log files.
                // Use the members returned by insufficientLogEx.getLogProviders()
                // to select the desired subset of members and pass the resulting
                // list as the argument to config.setLogProviders(), if the
                // default selection of providers is not suitable.
                restore.execute(insufficientLogEx, config);
            } catch (DatabaseException e) {
                if (i < RETRY_TIME - 1) {
                    try {
                        Thread.sleep(5 * 1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                } else {
                    LOG.error("error to open replicated environment. will exit.", e);
                    System.exit(-1);
                }
            }
        }
    }

其中bdb replication的相关文档在这 https://docs.oracle.com/cd/E17277_02/html/ReplicationGuide/BerkeleyDB-JE-Replication.pdf
BDBJournal 主要实现的是journal的接口:

    // Open the journal environment
    public void open();

    // Roll Edit file or database
    public void rollJournal();

    // Get the newest journal id 
    public long getMaxJournalId();

    // Get the oldest journal id
    public long getMinJournalId();

    // Close the environment
    public void close();

    // Get the journal which id = journalId
    public JournalEntity read(long journalId);

    // Get all the journals whose id: fromKey <= id <= toKey
    // toKey = -1 means toKey = Long.Max_Value
    public JournalCursor read(long fromKey, long toKey);

    // Write a journal and sync to disk
    public void write(short op, Writable writable);

    // Delete journals whose max id is less than deleteToJournalId
    public void deleteJournals(long deleteJournalToId);

    // Current db's min journal id - 1
    public long getFinalizedJournalId();

    // Get all the dbs' name
    public List<Long> getDatabaseNames();

注释写的非常明白其中重点需要看的是 读写方法

    @Override
    public synchronized void write(short op, Writable writable) {
        JournalEntity entity = new JournalEntity();
        entity.setOpCode(op);
        entity.setData(writable);

        // id is the key
        long id = nextJournalId.getAndIncrement();
        Long idLong = id;
        DatabaseEntry theKey = new DatabaseEntry();
        TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
        idBinding.objectToEntry(idLong, theKey);

        // entity is the value
        DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
        try {
            entity.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        DatabaseEntry theData = new DatabaseEntry(buffer.getData());
        if (MetricRepo.isInit) {
            MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
        }
        LOG.debug("opCode = {}, journal size = {}", op, theData.getSize());
        // Write the key value pair to bdb.
        boolean writeSuccessed = false;
        try {
            for (int i = 0; i < RETRY_TIME; i++) {
                try {
                    // Parameter null means auto commit
                    if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
                        writeSuccessed = true;
                        LOG.debug("master write journal {} finished. db name {}, current time {}",
                                id, currentJournalDB.getDb().getDatabaseName(), System.currentTimeMillis());
                        break;
                    }
                } catch (DatabaseException e) {
                    LOG.error("catch an exception when writing to database. sleep and retry. journal id {}", id, e);
                    try {
                        this.wait(5 * 1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        } finally {
            // If write failed, set nextJournalId to the origin value.
            if (!writeSuccessed) {
                nextJournalId.set(id);
            }
        }

        if (!writeSuccessed) {
            if (op == OperationType.OP_TIMESTAMP) {
                /*
                 * Do not exit if the write operation is OP_TIMESTAMP.
                 * If all the followers exit except master, master should continue provide query service.
                 * To prevent master exit, we should exempt OP_TIMESTAMP write
                 */
                LOG.warn("master can not achieve quorum. write timestamp fail. but will not exit.");
                return;
            }
            String msg = "write bdb failed. will exit. journalId: " + id + ", bdb database Name: " +
                    currentJournalDB.getDb().getDatabaseName();
            LOG.error(msg);
            Util.stdoutWithTime(msg);
            System.exit(-1);
        }
    }
 @Override
    public JournalCursor read(long fromKey, long toKey) {
        return BDBJournalCursor.getJournalCursor(bdbEnvironment, fromKey, toKey);
    }

写入都是通过JournalEntity来实现的 JournalEntity 包含opcode 和Writable 对象。
opcode 其实就是操作类型码 具体有哪些类型码可以在com.starrocks.persist.OperationType类中看到
writable其实就是写入相应操作的信息。
看个修改表名的具体例子就明白了:

public void renameTable(Database db, OlapTable table, TableRenameClause tableRenameClause) throws DdlException {
        if (table.getState() != OlapTableState.NORMAL) {
            throw new DdlException("Table[" + table.getName() + "] is under " + table.getState());
        }

        String oldTableName = table.getName();
        String newTableName = tableRenameClause.getNewTableName();
        if (oldTableName.equals(newTableName)) {
            throw new DdlException("Same table name");
        }

        // check if name is already used
        if (db.getTable(newTableName) != null) {
            throw new DdlException("Table name[" + newTableName + "] is already used");
        }

        table.checkAndSetName(newTableName, false);

        db.dropTable(oldTableName);
        db.createTable(table);

        TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName);
        editLog.logTableRename(tableInfo);
        LOG.info("rename table[{}] to {}, tableId: {}", oldTableName, newTableName, table.getId());
    }

rename table 最后调用了editLog的logTableRename 修改元数据
logTableRename的实现:

    public void logTableRename(TableInfo tableInfo) {
        logEdit(OperationType.OP_RENAME_TABLE, tableInfo);
    }

通过logEdit实现的传进去的是opcode 和tableInfo对象.

public class TableInfo implements Writable {

    private long dbId;
    private long tableId;
    private long indexId;
    private long partitionId;

    private String newTableName;
    private String newRollupName;
    private String newPartitionName;
xxxx
}

read方法的实现其实也类似:
从数据中读出相应的操作码和操作数
根据操作反序列化成相应的对象
BDBJournalCursor实现的接口 其实就是个迭代器获取结果

// This class is like JDBC ResultSet.
public interface JournalCursor {

    // Return the next journal. return null when there is no more journals
    public JournalEntity next();

    public void close();

}

元数据高可用的实现

BDBStateListener 中启动的监听对象:

 public synchronized void stateChange(StateChangeEvent sce) throws RuntimeException {
        FrontendNodeType newType = null;
        switch (sce.getState()) {
            case MASTER: {
                newType = FrontendNodeType.MASTER;
                break;
            }
            case REPLICA: {
                if (Catalog.getCurrentCatalog().isElectable()) {
                    newType = FrontendNodeType.FOLLOWER;
                } else {
                    newType = FrontendNodeType.OBSERVER;
                }
                break;
            }
            case UNKNOWN: {
                newType = FrontendNodeType.UNKNOWN;
                break;
            }
            default: {
                String msg = "this node is " + sce.getState().name();
                LOG.warn(msg);
                Util.stdoutWithTime(msg);
                return;
            }
        }
        Preconditions.checkNotNull(newType);
        Catalog.getCurrentCatalog().notifyNewFETypeTransfer(newType);
    }

最重要的是角色改变通知这个方法:

  public void notifyNewFETypeTransfer(FrontendNodeType newType) {
        try {
            String msg = "notify new FE type transfer: " + newType;
            LOG.warn(msg);
            Util.stdoutWithTime(msg);
            this.typeTransferQueue.put(newType);
        } catch (InterruptedException e) {
            LOG.error("failed to put new FE type: {}", newType, e);
        }
    }

会在typeTransferQueue 这个队列中放入相应的状态,catalog初始化的时候会创建线程监听该队列

 public void createStateListener() {
        listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) {
            @Override
            protected synchronized void runOneCycle() {

                while (true) {
                    FrontendNodeType newType = null;
                    try {
                        newType = typeTransferQueue.take();
                    } catch (InterruptedException e) {
                        LOG.error("got exception when take FE type from queue", e);
                        Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage());
                        System.exit(-1);
                    }
                    Preconditions.checkNotNull(newType);
                    LOG.info("begin to transfer FE type from {} to {}", feType, newType);
                    if (feType == newType) {
                        return;
                    }

                    /*
                     * INIT -> MASTER: transferToMaster
                     * INIT -> FOLLOWER/OBSERVER: transferToNonMaster
                     * UNKNOWN -> MASTER: transferToMaster
                     * UNKNOWN -> FOLLOWER/OBSERVER: transferToNonMaster
                     * FOLLOWER -> MASTER: transferToMaster
                     * FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false
                     */
                    switch (feType) {
                        case INIT: {
                            switch (newType) {
                                case MASTER: {
                                    transferToMaster(feType);
                                    break;
                                }
                                case FOLLOWER:
                                case OBSERVER: {
                                    transferToNonMaster(newType);
                                    break;
                                }
                                case UNKNOWN:
                                    break;
                                default:
                                    break;
                            }
                            break;
                        }
                        case UNKNOWN: {
                            switch (newType) {
                                case MASTER: {
                                    transferToMaster(feType);
                                    break;
                                }
                                case FOLLOWER:
                                case OBSERVER: {
                                    transferToNonMaster(newType);
                                    break;
                                }
                                default:
                                    break;
                            }
                            break;
                        }
                        case FOLLOWER: {
                            switch (newType) {
                                case MASTER: {
                                    transferToMaster(feType);
                                    break;
                                }
                                case UNKNOWN: {
                                    transferToNonMaster(newType);
                                    break;
                                }
                                default:
                                    break;
                            }
                            break;
                        }
                        case OBSERVER: {
                            if (newType == FrontendNodeType.UNKNOWN) {
                                transferToNonMaster(newType);
                            }
                            break;
                        }
                        case MASTER: {
                            // exit if master changed to any other type
                            String msg = "transfer FE type from MASTER to " + newType.name() + ". exit";
                            LOG.error(msg);
                            Util.stdoutWithTime(msg);
                            System.exit(-1);
                        }
                        default:
                            break;
                    } // end switch formerFeType

                    feType = newType;
                    LOG.info("finished to transfer FE type to {}", feType);
                }
            } // end runOneCycle
        };

        listener.setMetaContext(metaContext);
    }

当队列的状态发生改变的时候进行相应的操作: transfertoMaster transfertoNonMaster
transfertoMaster:

  1. 停止replayer
  2. open and roll editlog
  3. startMasterOnlyDaemonThreads and startNonMasterDaemonThreads
private void transferToMaster(FrontendNodeType oldType) {
        // stop replayer
        if (replayer != null) {
            replayer.exit();
            try {
                replayer.join();
            } catch (InterruptedException e) {
                LOG.warn("got exception when stopping the replayer thread", e);
            }
            replayer = null;
        }

        // set this after replay thread stopped. to avoid replay thread modify them.
        isReady.set(false);
        canRead.set(false);

        editLog.open();

        if (!haProtocol.fencing()) {
            LOG.error("fencing failed. will exit.");
            System.exit(-1);
        }

        long replayStartTime = System.currentTimeMillis();
        // replay journals. -1 means replay all the journals larger than current journal id.
        replayJournal(-1);
        long replayEndTime = System.currentTimeMillis();
        LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");

        checkCurrentNodeExist();

        editLog.rollEditLog();

        // Set the feType to MASTER before writing edit log, because the feType must be Master when writing edit log.
        // It will be set to the old type if any error happens in the following procedure
        feType = FrontendNodeType.MASTER;
        try {
            // Log meta_version
            int communityMetaVersion = MetaContext.get().getMetaVersion();
            int starrocksMetaVersion = MetaContext.get().getStarRocksMetaVersion();
            if (communityMetaVersion < FeConstants.meta_version ||
                    starrocksMetaVersion < FeConstants.starrocks_meta_version) {
                editLog.logMetaVersion(new MetaVersion(FeConstants.meta_version, FeConstants.starrocks_meta_version));
                MetaContext.get().setMetaVersion(FeConstants.meta_version);
                MetaContext.get().setStarRocksMetaVersion(FeConstants.starrocks_meta_version);
            }

            // Log the first frontend
            if (isFirstTimeStartUp) {
                // if isFirstTimeStartUp is true, frontends must contains this Node.
                Frontend self = frontends.get(nodeName);
                Preconditions.checkNotNull(self);
                // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
                editLog.logAddFirstFrontend(self);
            }

            if (!isDefaultClusterCreated) {
                initDefaultCluster();
            }

            // MUST set master ip before starting checkpoint thread.
            // because checkpoint thread need this info to select non-master FE to push image
            this.masterIp = FrontendOptions.getLocalHostAddress();
            this.masterRpcPort = Config.rpc_port;
            this.masterHttpPort = Config.http_port;
            MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort);
            editLog.logMasterInfo(info);

            // start all daemon threads that only running on MASTER FE
            startMasterOnlyDaemonThreads();
            // start other daemon threads that should running on all FE
            startNonMasterDaemonThreads();

            MetricRepo.init();

            canRead.set(true);
            isReady.set(true);

            String msg = "master finished to replay journal, can write now.";
            Util.stdoutWithTime(msg);
            LOG.info(msg);
            // for master, there are some new thread pools need to register metric
            ThreadPoolManager.registerAllThreadPoolMetric();
        } catch (Throwable t) {
            LOG.warn("transfer to master failed with error", t);
            feType = oldType;
            throw t;
        }
    }
Responses