starrocks SQL的解析与执行
in with 0 comment

starrocks SQL的解析与执行

in with 0 comment

starrocks QE的启动

QE 是一个实现了mysql协议的server.starrocks在原先的基础上实现了nio的mysqlserver 提高查询相应时间。
QE的启动涉及的类:
NMysqlServer:nio版本的mysql协议server
AcceptListener: 监听连接的Listener
ConnectScheduler: 连接调度器
ConnectProcessor:连接处理器
下面来看处理mysql连接的流程:
1 nMysqlServer的启动:

public boolean start() {
        try {
            server = xnioWorker.createStreamConnectionServer(new InetSocketAddress(port),
                    acceptListener,
                    OptionMap.create(Options.TCP_NODELAY, true, Options.BACKLOG, Config.mysql_nio_backlog_num));
            server.resumeAccepts();
            running = true;
            LOG.info("Open mysql server success on {}", port);
            return true;
        } catch (IOException e) {
            LOG.warn("Open MySQL network service failed.", e);
            return false;
        }
    }

通过XnioWorker 创建createStreamConnectionServer 通过acceptListener 接收连接请求和处理

AcceptListener 处理连接的函数如下:
1 接收连接
2 构建上下文并并从worker中获取线程处理当前的连接
3 进行验证
4 启动ConnectProcessor处理后续的SQL 请求

 public void handleEvent(AcceptingChannel<StreamConnection> channel) {
        try {
            StreamConnection connection = channel.accept();
            if (connection == null) {
                return;
            }
            LOG.info("Connection established. remote={}", connection.getPeerAddress());
            // connection has been established, so need to call context.cleanup()
            // if exception happens.
            NConnectContext context = new NConnectContext(connection);
            context.setCatalog(Catalog.getCurrentCatalog());
            connectScheduler.submit(context);

            try {
                channel.getWorker().execute(() -> {
                    try {
                        // Set thread local info
                        context.setThreadLocalInfo();
                        context.setConnectScheduler(connectScheduler);
                        // authenticate check failed.
                        if (!MysqlProto.negotiate(context)) {
                            throw new AfterConnectedException("mysql negotiate failed");
                        }
                        if (connectScheduler.registerConnection(context)) {
                            MysqlProto.sendResponsePacket(context);
                            connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context));
                        } else {
                            context.getState().setError("Reach limit of connections");
                            MysqlProto.sendResponsePacket(context);
                            throw new AfterConnectedException("Reach limit of connections");
                        }
                        context.setStartTime();
                        ConnectProcessor processor = new ConnectProcessor(context);
                        context.startAcceptQuery(processor);
                    } catch (AfterConnectedException e) {
                        // do not need to print log for this kind of exception.
                        // just clean up the context;
                        context.cleanup();
                    } catch (Throwable e) {
                        if (e instanceof Error) {
                            LOG.error("connect processor exception because ", e);
                        } else {
                            // should be unexpected exception, so print warn log
                            LOG.warn("connect processor exception because ", e);
                        }
                        context.cleanup();
                    } finally {
                        ConnectContext.remove();
                    }
                });
            } catch (Throwable e) {
                if (e instanceof Error) {
                    LOG.error("connect processor exception because ", e);
                } else {
                    // should be unexpected exception, so print warn log
                    LOG.warn("connect processor exception because ", e);
                }
                context.cleanup();
                ConnectContext.remove();
            }
        } catch (IOException e) {
            LOG.warn("Connection accept failed.", e);
        }
    }

有两个重点是 connectScheduler.submit(context); 和 context.startAcceptQuery(processor);
其中的submit 启动了LoopHandler 线程不断地启动新的connectProcessor处理新进来的请求。
而startAcceptQuery 是为了调度ReadListener

    public void startAcceptQuery(NConnectContext nConnectContext, ConnectProcessor connectProcessor) {
        conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor));
        conn.getSourceChannel().resumeReads();
    }

ReadListener 重点方法还是 connectProcessor.processOnce();processOnce的中重点是dispatch()方法其实就是处理一条完整的SQL操作命令命令处理的重点代码:

 private void dispatch() throws IOException {
        int code = packetBuf.get();
        MysqlCommand command = MysqlCommand.fromCode(code);
        if (command == null) {
            ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
            ctx.getState().setError("Unknown command(" + command + ")");
            LOG.warn("Unknown command(" + command + ")");
            return;
        }
        ctx.setCommand(command);
        ctx.setStartTime();

        switch (command) {
            case COM_INIT_DB:
                handleInitDb();
                break;
            case COM_QUIT:
                handleQuit();
                break;
            case COM_QUERY:
                handleQuery();
                ctx.setStartTime();
                break;
            case COM_FIELD_LIST:
                handleFieldList();
                break;
            case COM_CHANGE_USER:
                handleChangeUser();
                break;
            case COM_RESET_CONNECTION:
                handleResetConnnection();
                break;
            case COM_PING:
                handlePing();
                break;
            default:
                ctx.getState().setError("Unsupported command(" + command + ")");
                LOG.warn("Unsupported command(" + command + ")");
                break;
        }
    }

走到这我们总结下,借助xnio的nio处理模型,我们接收到连接读取packet并将packet交给注册的AcceptListener进行连接处理的,根据不同的SQL命令执行不同的操作并将结果返回。

SQL解析

背景知识:

基本流程 词法解析 -》语法解析 -》 逻辑计划-》物理执行计划
starrocks的词法解析和语法解析是通过 jflex 与javacup完成的。

jflex

词法解析是通过在在jflex中定义关键字。

   private static final Map<String, Integer> keywordMap = new LinkedHashMap<String, Integer>();
    static {
        keywordMap.put("&&", new Integer(SqlParserSymbols.KW_AND));
        keywordMap.put("add", new Integer(SqlParserSymbols.KW_ADD));
        keywordMap.put("admin", new Integer(SqlParserSymbols.KW_ADMIN));
        keywordMap.put("after", new Integer(SqlParserSymbols.KW_AFTER));
        keywordMap.put("aggregate", new Integer(SqlParserSymbols.KW_AGGREGATE));
        keywordMap.put("all", new Integer(SqlParserSymbols.KW_ALL));
        keywordMap.put("alter", new Integer(SqlParserSymbols.KW_ALTER));
        keywordMap.put("and", new Integer(SqlParserSymbols.KW_AND));
......
      }
tokenIdMap
tokenIdMap.put(new Integer(SqlParserSymbols.IDENT), "IDENTIFIER");
    tokenIdMap.put(new Integer(SqlParserSymbols.COMMA), "COMMA");
    tokenIdMap.put(new Integer(SqlParserSymbols.BITNOT), "~");
    tokenIdMap.put(new Integer(SqlParserSymbols.LPAREN), "(");
    tokenIdMap.put(new Integer(SqlParserSymbols.RPAREN), ")");
    tokenIdMap.put(new Integer(SqlParserSymbols.LBRACKET), "[");
    tokenIdMap.put(new Integer(SqlParserSymbols.RBRACKET), "]");
    tokenIdMap.put(new Integer(SqlParserSymbols.LBRACE), "{");
    tokenIdMap.put(new Integer(SqlParserSymbols.RBRACE), "}");
    tokenIdMap.put(new Integer(SqlParserSymbols.COLON), ":");

sql 词法分析是将SQL语句一个个分割成token sql

sql_parser_cup

cup中定义了语法

grant_user ::=
    user_identity:user_id
    {:
        /* No password */
        RESULT = new UserDesc(user_id);
    :}
    | user_identity:user_id KW_IDENTIFIED KW_BY STRING_LITERAL:password
    {:
        /* plain text password */
        RESULT = new UserDesc(user_id, password, true);
    :}
    | user_identity:user_id KW_IDENTIFIED KW_BY KW_PASSWORD STRING_LITERAL:password
    {:
        /* hashed password */
        RESULT = new UserDesc(user_id, password, false);
    :}
    | user_identity:user_id KW_IDENTIFIED KW_WITH IDENT:auth_plugin
    {:
        /* with auth plugin */
        RESULT = new UserDesc(user_id, auth_plugin);
    :}
    | user_identity:user_id KW_IDENTIFIED KW_WITH IDENT:auth_plugin KW_AS STRING_LITERAL:auth_string
    {:
        /* with auth plugin and auth string, encrypted if auth string is password */
        RESULT = new UserDesc(user_id, auth_plugin, auth_string, false);
    :}
    | user_identity:user_id KW_IDENTIFIED KW_WITH IDENT:auth_plugin KW_BY STRING_LITERAL:auth_string
    {:
        /* with auth plugin and auth string, plain text if auth string is password */
        RESULT = new UserDesc(user_id, auth_plugin, auth_string, true);
    :}
    ;

比如上边grant的语句他定义了怎样的grant 语句是合法的。通过的语法解析便会生成相应的语法解析树也就是我们常说的AST.

AST解析完成后便需要生成逻辑计划。什么是逻辑计划,逻辑计划负责将抽象语法树转成代数关系。代数关系是一棵算子树,每个节点代表一种对数据的计算方式。 就是你得告诉我每一步这个数据需要怎么处理.就像下边这个图:

第一步扫描两张表的数据,第二步对数据进行join 第三步对数据进行过滤

物理计划

物理计划是在逻辑计划的基础上,根据机器的分布,数据的分布,决定去哪些机器上执行哪些计算操作。就是真正在每一台机器上要做什么操作。

Anltr

starrocks 在原来 jflex 和jcup的基础之上重新利用业界成熟的ANLTR4 引擎生成 SQL的词法和语法解析器
ANLTR 语法解析器的基本介绍可以参照:https://www.cnblogs.com/vivotech/p/15122322.html#fromHistory#fromHistory

SQL 执行优化

继续接着上文的handlerQuery函数

private void handleQuery() {
    ...
   try {
                stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable().getSqlMode());
            } catch (ParsingException parsingException) {
                throw new AnalysisException(parsingException.getMessage());
            } catch (Exception e) {
                stmts = analyze(originStmt);
            }
    ... 
    executor = new StmtExecutor(ctx, parsedStmt);
    ctx.setExecutor(executor);
    executor.execute();
}

ConnectProcessor类的analyze方法:

input:String类型的 SQL 字符串
output:StatementBase类型的 AST 语法树

/ analyze the origin stmt and return multi-statements
private List<StatementBase> analyze(String originStmt) throws AnalysisException {
    ...
    SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
    SqlParser parser = new SqlParser(input);
    try {
        return SqlParserUtils.getMultiStmts(parser);
    } catch
    ...
}

AST树状结构如图

每一个节点都是一类statement 如 SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 针对SQL的不同部分会进行相应的原语解析。

execute 生成logic plan

针对parse 完的statement进一步生成执行计划

  public void execute() throws Exception {
         execPlan = new StatementPlanner().plan(parsedStmt, context);
}
 public ExecPlan plan(StatementBase stmt, ConnectContext session) throws AnalysisException {
        Analyzer.analyze(stmt, session);
        PrivilegeChecker.check(stmt, session);

        if (stmt instanceof QueryStatement) {
          ...
                ExecPlan plan = createQueryPlan(((QueryStatement) stmt).getQueryRelation(), session);
            
        } else if (stmt instanceof InsertStmt) {
            ...
                return new InsertPlanner().plan((InsertStmt) stmt, session);
            
        } else if (stmt instanceof UpdateStmt) {
           ...
                return new UpdatePlanner().plan(updateStmt, session);
           
    }

首先是对statement进行analyse 然后针对不同的语句创建不同的执行计划。queryplan insertplan and updateplan
Analyzer里针对不同的语句进行了关系的分析从而生成相应relation关系图。
针对Query的analyzer QueryAnalyzer他主要做了一下事情:
1 关联SQL语句到相应的Cluster database Table
2 SQL 的合法性检查
3 SQL重写
4 Table 和 Column 的别名处理
5 Tuple, Slot, 表达式分配唯一的 ID
6 函数参数的合法性检测
7 表达式替换
8 类型检查,类型转换
Analyzer类的analyze方法:
Input:StatementBase类型的 AST 语法树
output:Relation类型的关系代数

public class SelectRelation extends QueryRelation {
    private final Expr predicate;
    private final List<Expr> groupBy;
    private final List<FunctionCallExpr> aggregate;
  ...
    // 子Relation
    private final Relation relation;

    private Map<Expr, FieldId> columnReferences;
}

select from a join (select from xx where a<10)b on a.id=b.id

createQueryPlan 是针对relation 进一步生成逻辑执行计划:
1 构建逻辑执行计划
2 SQL优化
3 分片执行计划

  QueryRelation query = (QueryRelation) relation;
        List<String> colNames = query.getColumnOutputNames();

        //1. Build Logical plan
        ColumnRefFactory columnRefFactory = new ColumnRefFactory();
        LogicalPlan logicalPlan = new RelationTransformer(columnRefFactory, session).transformWithSelectLimit(query);

        //2. Optimize logical plan and build physical plan
        Optimizer optimizer = new Optimizer();
        OptExpression optimizedPlan = optimizer.optimize(
                session,
                logicalPlan.getRoot(),
                new PhysicalPropertySet(),
                new ColumnRefSet(logicalPlan.getOutputColumn()),
                columnRefFactory);

        //3. Build fragment exec plan
        /*
         * SingleNodeExecPlan is set in TableQueryPlanAction to generate a single-node Plan,
         * currently only used in Spark/Flink Connector
         * Because the connector sends only simple queries, it only needs to remove the output fragment
         */
        if (session.getSessionVariable().isSingleNodeExecPlan()) {
            return new PlanFragmentBuilder().createPhysicalPlanWithoutOutputFragment(
                    optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames);
        } else {
            return new PlanFragmentBuilder().createPhysicalPlan(
                    optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames);
        }

其中optimizer是重要的部分,谓词下推等操作都是在这里边进行。先来看逻辑执行计划的构建:

 LogicalPlan logicalPlan = new RelationTransformer(columnRefFactory, session).transformWithSelectLimit(query);

这部分是将relation转换为对数据的operator的操作。

select from a join (select from xx where a<10)b on a.id=b.id 转换成的operator才做图

Responses