clickhouse 执行sql的过程
标签(空格分隔): clickhouse
上篇讲了clickhouse-server的启动过程,其中介绍了tcp Handler接收客户端请求,那么这篇我们着重来介绍接收SQL后,clickhouse是怎么执行这个过程的。
clickhouse的执行sql 的流程主要有如下图的几步:
接收SQL 的过程
runImpl(){
receiveHello();
sendHello(); //建立tcp 连接
// ...初始化context 和state
receivePacket() 接收数据包
//state 对数据包进行初始化等操作
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data); //将state 和context接收到的sql执行
判断是insert 还是select 最后选择是否输出到BlckIO到client
}
重点来研究这个executeQuery 的实现是src/interpreters/executeQuery.cpp
executeQueryImpl的主要方法如下:
ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth); //解析sql 生成ast解析树
auto interpreter = InterpreterFactory::get(ast, context, stage);//对ast 进行优化
res = interpreter->execute(); //生成执行计划进执行excutor
return std::make_tuple(ast, std::move(res)); //将执行的res 包装成为block输出流
跟踪parseQuery 的实现 进入tryParseQuery. 最关键的一个方法是
bool parse_res = parser.parse(token_iterator, res, expected);
parser 是IParse接口的实现类 IParse继承了IParserBase
查看parseImpl 的实现类 src/Parsers/ParseQuery
bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserQueryWithOutput query_with_output_p(enable_explain);
ParserInsertQuery insert_p(end); //insert
ParserUseQuery use_p; //use
ParserSetQuery set_p; //set
ParserSystemQuery system_p; //system
ParserCreateUserQuery create_user_p; //用户操作
ParserCreateRoleQuery create_role_p; //角色操作
ParserCreateQuotaQuery create_quota_p; //资源限制操作
ParserCreateRowPolicyQuery create_row_policy_p; //行权限操作
ParserCreateSettingsProfileQuery create_settings_profile_p; //setting 操作
ParserDropAccessEntityQuery drop_access_entity_p; //drop 权限操作
ParserGrantQuery grant_p; //grant 操作
ParserSetRoleQuery set_role_p; //设置角色操作
ParserExternalDDLQuery external_ddl_p; //外部DDL操作
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_role_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected)
|| create_user_p.parse(pos, node, expected)
|| create_role_p.parse(pos, node, expected)
|| create_quota_p.parse(pos, node, expected)
|| create_row_policy_p.parse(pos, node, expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected);
return res;
}
可以看到SQL的所有操作包含以上罗列的所有操作。
我们来看看src/Parsers/ParserSelectQuery.cpp ParseSelectQuery的具体实现
bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto select_query = std::make_shared<ASTSelectQuery>();
node = select_query;
ParserKeyword s_select("SELECT");
ParserKeyword s_distinct("DISTINCT");
ParserKeyword s_from("FROM");
ParserKeyword s_prewhere("PREWHERE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_with("WITH");
ParserKeyword s_totals("TOTALS");
ParserKeyword s_having("HAVING");
ParserKeyword s_order_by("ORDER BY");
ParserKeyword s_limit("LIMIT");
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_by("BY");
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_cube("CUBE");
ParserKeyword s_top("TOP");
ParserKeyword s_with_ties("WITH TIES");
ParserKeyword s_offset("OFFSET");
ParserNotEmptyExpressionList exp_list(false);
ParserNotEmptyExpressionList exp_list_for_with_clause(false);
ParserNotEmptyExpressionList exp_list_for_select_clause(true); /// Allows aliases without AS keyword.
ParserExpressionWithOptionalAlias exp_elem(false);
ParserOrderByExpressionList order_list;
ParserToken open_bracket(TokenType::OpeningRoundBracket);
ParserToken close_bracket(TokenType::ClosingRoundBracket);
ASTPtr with_expression_list;
ASTPtr select_expression_list;
ASTPtr tables;
ASTPtr prewhere_expression;
ASTPtr where_expression;
ASTPtr group_expression_list;
ASTPtr having_expression;
ASTPtr order_expression_list;
ASTPtr limit_by_length;
ASTPtr limit_by_offset;
ASTPtr limit_by_expression_list;
ASTPtr limit_offset;
ASTPtr limit_length;
ASTPtr top_length;
ASTPtr settings;
/// WITH expr list 依次解析上边SQL 获取是否存在关键字的节点
....
select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables));
select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression));
select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression));
select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset));
select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length));
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));
return true;
}
可以看到解析select 的关键字组装成相应的AST节点。组装完成AST树之后就可以开始对树进行解优化了。
std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
ProfileEvents::increment(ProfileEvents::Query);
if (query->as<ASTSelectQuery>())
{
/// This is internal part of ASTSelectWithUnionQuery.
/// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child.
return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage));
}
else if (query->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::SelectQuery);
return std::make_unique<InterpreterSelectWithUnionQuery>(query, context, SelectQueryOptions(stage));
}
else if (query->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::InsertQuery);
bool allow_materialized = static_cast<bool>(context.getSettingsRef().insert_allow_materialized_columns);
return std::make_unique<InterpreterInsertQuery>(query, context, allow_materialized);
}
else if (query->as<ASTCreateQuery>())
{
return std::make_unique<InterpreterCreateQuery>(query, context);
}
else if (query->as<ASTDropQuery>())
{
return std::make_unique<InterpreterDropQuery>(query, context);
}
else if (query->as<ASTRenameQuery>())
{
return std::make_unique<InterpreterRenameQuery>(query, context);
}
else if (query->as<ASTShowTablesQuery>())
{
return std::make_unique<InterpreterShowTablesQuery>(query, context);
}
else if (query->as<ASTUseQuery>())
{
return std::make_unique<InterpreterUseQuery>(query, context);
}
else if (query->as<ASTSetQuery>())
{
/// readonly is checked inside InterpreterSetQuery
return std::make_unique<InterpreterSetQuery>(query, context);
}
else if (query->as<ASTSetRoleQuery>())
{
return std::make_unique<InterpreterSetRoleQuery>(query, context);
}
else if (query->as<ASTOptimizeQuery>())
{
return std::make_unique<InterpreterOptimizeQuery>(query, context);
}
else if (query->as<ASTExistsTableQuery>())
{
return std::make_unique<InterpreterExistsQuery>(query, context);
}
else if (query->as<ASTExistsDictionaryQuery>())
{
return std::make_unique<InterpreterExistsQuery>(query, context);
}
else if (query->as<ASTShowCreateTableQuery>())
{
return std::make_unique<InterpreterShowCreateQuery>(query, context);
}
else if (query->as<ASTShowCreateDatabaseQuery>())
{
return std::make_unique<InterpreterShowCreateQuery>(query, context);
}
else if (query->as<ASTShowCreateDictionaryQuery>())
{
return std::make_unique<InterpreterShowCreateQuery>(query, context);
}
else if (query->as<ASTDescribeQuery>())
{
return std::make_unique<InterpreterDescribeQuery>(query, context);
}
else if (query->as<ASTExplainQuery>())
{
return std::make_unique<InterpreterExplainQuery>(query, context);
}
else if (query->as<ASTShowProcesslistQuery>())
{
return std::make_unique<InterpreterShowProcesslistQuery>(query, context);
}
else if (query->as<ASTAlterQuery>())
{
return std::make_unique<InterpreterAlterQuery>(query, context);
}
else if (query->as<ASTCheckQuery>())
{
return std::make_unique<InterpreterCheckQuery>(query, context);
}
else if (query->as<ASTKillQueryQuery>())
{
return std::make_unique<InterpreterKillQueryQuery>(query, context);
}
else if (query->as<ASTSystemQuery>())
{
return std::make_unique<InterpreterSystemQuery>(query, context);
}
else if (query->as<ASTWatchQuery>())
{
return std::make_unique<InterpreterWatchQuery>(query, context);
}
else if (query->as<ASTCreateUserQuery>())
{
return std::make_unique<InterpreterCreateUserQuery>(query, context);
}
else if (query->as<ASTCreateRoleQuery>())
{
return std::make_unique<InterpreterCreateRoleQuery>(query, context);
}
else if (query->as<ASTCreateQuotaQuery>())
{
return std::make_unique<InterpreterCreateQuotaQuery>(query, context);
}
else if (query->as<ASTCreateRowPolicyQuery>())
{
return std::make_unique<InterpreterCreateRowPolicyQuery>(query, context);
}
else if (query->as<ASTCreateSettingsProfileQuery>())
{
return std::make_unique<InterpreterCreateSettingsProfileQuery>(query, context);
}
else if (query->as<ASTDropAccessEntityQuery>())
{
return std::make_unique<InterpreterDropAccessEntityQuery>(query, context);
}
else if (query->as<ASTGrantQuery>())
{
return std::make_unique<InterpreterGrantQuery>(query, context);
}
else if (query->as<ASTShowCreateAccessEntityQuery>())
{
return std::make_unique<InterpreterShowCreateAccessEntityQuery>(query, context);
}
else if (query->as<ASTShowGrantsQuery>())
{
return std::make_unique<InterpreterShowGrantsQuery>(query, context);
}
else if (query->as<ASTShowAccessEntitiesQuery>())
{
return std::make_unique<InterpreterShowAccessEntitiesQuery>(query, context);
}
else if (query->as<ASTShowAccessQuery>())
{
return std::make_unique<InterpreterShowAccessQuery>(query, context);
}
else if (query->as<ASTShowPrivilegesQuery>())
{
return std::make_unique<InterpreterShowPrivilegesQuery>(query, context);
}
else if (query->as<ASTExternalDDLQuery>())
{
return std::make_unique<InterpreterExternalDDLQuery>(query, context);
}
else
{
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
}
上边是根据ASt节点的类型生成对应的Interpreter 由其来具体执行对应AST的执行计划。来看看最复杂的select
InterpreterSelectQuery:InterpreterSelectQuery() 的实现方法如下:
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
const BlockInputStreamPtr & input_,
std::optional<Pipe> input_pipe_,
const StoragePtr & storage_,
const SelectQueryOptions & options_,
const Names & required_result_column_names,
const StorageMetadataPtr & metadata_snapshot_)
: options(options_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
, query_ptr(options.modify_inplace ? query_ptr_ : query_ptr_->clone())
, context(std::make_shared<Context>(context_))
, storage(storage_)
, input(input_)
, input_pipe(std::move(input_pipe_))
, log(&Poco::Logger::get("InterpreterSelectQuery"))
, metadata_snapshot(metadata_snapshot_)
{
auto analyze = [&] (bool try_move_to_prewhere) //将prewhere下推对ast进行优化
{
syntax_analyzer_result = TreeRewriter(*context).analyzeSelect(
query_ptr,
TreeRewriterResult(source_header.getNamesAndTypesList(), storage, metadata_snapshot),
options, joined_tables.tablesWithColumns(), required_result_column_names, table_join);
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, *context, metadata_snapshot,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
!options.only_analyze, options, std::move(subquery_for_sets));
}
重点TreeRewriter 对当前的的ast 进行的优化, 再将分析的结果构造查询计划。
看看是如何分析和优化ast 树的:
TreeRewriterResultPtr TreeRewriter::analyzeSelect(
ASTPtr & query,
TreeRewriterResult && result,
const SelectQueryOptions & select_options,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns,
const Names & required_result_columns,
std::shared_ptr<TableJoin> table_join) const
{
auto * select_query = query->as<ASTSelectQuery>();
if (!select_query)
throw Exception("Select analyze for not select asts.", ErrorCodes::LOGICAL_ERROR);
size_t subquery_depth = select_options.subquery_depth;
bool remove_duplicates = select_options.remove_duplicates;
const auto & settings = context.getSettingsRef();
const NameSet & source_columns_set = result.source_columns_set;
if (table_join)
{
result.analyzed_join = table_join;
result.analyzed_join->resetCollected();
}
else /// TODO: remove. For now ExpressionAnalyzer expects some not empty object here
result.analyzed_join = std::make_shared<TableJoin>();
if (remove_duplicates)
renameDuplicatedColumns(select_query); //去除重复行
if (tables_with_columns.size() > 1)
{
result.analyzed_join->columns_from_joined_table = tables_with_columns[1].columns;
result.analyzed_join->deduplicateAndQualifyColumnNames(
source_columns_set, tables_with_columns[1].table.getQualifiedNamePrefix());
}
translateQualifiedNames(query, *select_query, source_columns_set, tables_with_columns); //验证列名
/// Optimizes logical expressions.
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.value).perform(); //优化逻辑执行计划
normalize(query, result.aliases, settings);
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
/// Must be after 'normalizeTree' (after expanding aliases, for aliases not get lost)
/// and before 'executeScalarSubqueries', 'analyzeAggregation', etc. to avoid excessive calculations.
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates); //移除不需要的选择列
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, context, subquery_depth, result.scalars, select_options.only_analyze);// 执行标量子查询,并且用常量替代标量子查询结果
TreeOptimizer::apply(query, result.aliases, source_columns_set, tables_with_columns, context, result.rewrite_subqueries);
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(query, result, select_query, result.source_columns, source_columns_set);
setJoinStrictness(*select_query, settings.join_default_strictness, settings.any_join_distinct_right_table_keys,
result.analyzed_join->table_join);
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
result.aggregates = getAggregates(query, *select_query);
result.collectUsedColumns(query, true);
result.ast_join = select_query->join();
if (result.optimize_trivial_count)
result.optimize_trivial_count = settings.optimize_trivial_count_query &&
!select_query->where() && !select_query->prewhere() && !select_query->groupBy() && !select_query->having() &&
!select_query->sampleSize() && !select_query->sampleOffset() && !select_query->final() &&
(tables_with_columns.size() < 2 || isLeft(result.analyzed_join->kind()));
return std::make_shared<const TreeRewriterResult>(result);
}
谓词下推: 将where 条件下推到子查询中减少查询的数据量,加速查询的速度
标量替换: 将子查询结果当成常量以减少计算开销
query_analyzer 最主要的是将这些select order by limit 这些组件拼装在一起
最后是InterpreterSelectQuery::executeImpl
这个方法很简单:
executeFetchColumns()
executeWhere(query_plan, expressions.before_where, expressions.remove_where_filter);
executeAggregation
executeOrder
executeLimitBy
其实就是根据query_plan 分步执行,最后将结构变成BlockIO
最后执行的BlockIO 流会交由Server,server将数据放入缓冲区中,再交由客户端获取缓冲区的结果返回、
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Mar 21, 2022 at 05:00 pm