clickhouse
clickhouse 的启动的过程
clickhouse-server 启动的主要文件在programsserverclickhouse-server.cpp 的main 方法中。
int mainEntryClickHouseServer(int argc, char ** argv)
{
DB::Server app;
try
{
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}
看看DB::Server 的方法
server 的run 方法
int Server::run()
{
if (config().hasOption("help"))
{
Poco::Util::HelpFormatter help_formatter(Server::options());
std::stringstream header;
header << commandName() << " [OPTION] [-- [ARG]...]\n";
header << "positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010";
help_formatter.setHeader(header.str());
help_formatter.format(std::cout);
return 0;
}
if (config().hasOption("version"))
{
std::cout << DBMS_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
return 0;
}
return Application::run(); // NOLINT
}
很显然是Application 的run
int Application::run()
{
int rc = EXIT_CONFIG;
initialize(*this);
try
{
rc = EXIT_SOFTWARE;
rc = main(_unprocessedArgs);
}
catch (Poco::Exception& exc)
{
logger().log(exc);
}
catch (std::exception& exc)
{
logger().error(exc.what());
}
catch (...)
{
logger().fatal("system exception");
}
uninitialize();
return rc;
}
可以看到里边有个main 方法
int Application::main(const ArgVec& args)
{
return EXIT_OK;
}
诡异的是只有个exit_ok的方法,所以这个肯定不是真正的方法。
class Server : public BaseDaemon, public IServer
class BaseDaemon : public Poco::Util::ServerApplication, public Loggers
class Util_API ServerApplication: public Application
Server -> ServerApplication -> Application
所以Server的main方法覆盖了父类Application的main 方法才是真正启动的main 方法。
int Server::main(const std::vector<std::string> & /*args*/)
{
Poco::Logger * log = &logger();
UseSSL use_ssl;
ThreadStatus thread_status;
# 注册函数方法和引擎
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
......
for (const auto & listen_host : listen_hosts)
{
auto create_server = [&](const char * port_name, auto && func)
{
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
if (!config().has(port_name))
return;
auto port = config().getInt(port_name);
try
{
func(port);
}
catch (const Poco::Exception &)
{
std::string message = "Listen [" + listen_host + "]:" + std::to_string(port) + " failed: " + getCurrentExceptionMessage(false);
if (listen_try)
{
LOG_WARNING(log, "{}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>",
message);
}
else
{
throw Exception{message, ErrorCodes::NETWORK_ERROR};
}
}
};
/// HTTP
create_server("http_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for http://{}", address.toString());
});
/// HTTPS
create_server("https_port", [&](UInt16 port)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for https://{}", address.toString());
#else
UNUSED(port);
throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
});
/// TCP
create_server("tcp_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections with native protocol (tcp): {}", address.toString());
});
/// TCP with SSL
create_server("tcp_port_secure", [&](UInt16 port)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure= */ true),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): {}", address.toString());
#else
UNUSED(port);
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
});
/// Interserver IO HTTP
create_server("interserver_http_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
});
create_server("interserver_https_port", [&](UInt16 port)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
#else
UNUSED(port);
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
});
create_server("mysql_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString());
});
create_server("postgresql_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new PostgreSQLHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString());
});
/// Prometheus (if defined and not setup yet with http_port)
create_server("prometheus.port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
});
}
if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->enableNamedSessions();
for (auto & server : servers)
server->start();
{
String level_str = config().getString("text_log.level", "");
int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
setTextLog(global_context->getTextLog(), level);
}
buildLoggers(config(), logger());
main_config_reloader->start();
access_control.startPeriodicReloadingUsersConfigs();
if (dns_cache_updater)
dns_cache_updater->start();
{
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
formatReadableSizeWithBinarySuffix(memory_amount),
getNumberOfPhysicalCPUCores(), // on ARM processors it can show only enabled at current moment cores
std::thread::hardware_concurrency());
}
LOG_INFO(log, "Ready for connections.");
SCOPE_EXIT({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");
is_cancelled = true;
int current_connections = 0;
for (auto & server : servers)
{
server->stop();
current_connections += server->currentConnections();
}
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(log, "Closed all listening sockets.");
/// Killing remaining queries.
global_context->getProcessList().killAllQueries();
if (current_connections)
{
const int sleep_max_ms = 1000 * config().getInt("shutdown_wait_unfinished", 5);
const int sleep_one_ms = 100;
int sleep_current_ms = 0;
while (sleep_current_ms < sleep_max_ms)
{
current_connections = 0;
for (auto & server : servers)
current_connections += server->currentConnections();
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_one_ms));
}
}
if (current_connections)
LOG_INFO(log, "Closed connections. But {} remain."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>", current_connections);
else
LOG_INFO(log, "Closed connections.");
dns_cache_updater.reset();
main_config_reloader.reset();
if (current_connections)
{
/// There is no better way to force connections to close in Poco.
/// Otherwise connection handlers will continue to live
/// (they are effectively dangling objects, but they use global thread pool
/// and global thread pool destructor will wait for threads, preventing server shutdown).
/// Dump coverage here, because std::atexit callback would not be called.
dumpCoverageReportIfPossible();
LOG_INFO(log, "Will shutdown forcefully.");
_exit(Application::EXIT_OK);
}
});
/// try to load dictionaries immediately, throw on error and die
ext::scope_guard dictionaries_xmls, models_xmls;
try
{
if (!config().getBool("dictionaries_lazy_load", true))
{
global_context->tryCreateEmbeddedDictionaries();
global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
}
dictionaries_xmls = global_context->getExternalDictionariesLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config"));
models_xmls = global_context->getExternalModelsLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "models_config"));
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(
global_context->getConfigRef(), graphite_key, async_metrics));
}
waitForTerminationRequest();
}
return Application::EXIT_OK;
}
可以看到 开始是注册各种函数和引擎,接着根据我们的配置文件初始化相关的文件配置,最后的开始启动TCP和http等服务。
创建完成后再逐个启动server
for (auto & server : servers)
server->start();
->
//调用的是tcpServer的模板方法
void TCPServer::start()
{
poco_assert (_stopped);
_stopped = false;
_thread.start(*this);
}
TCPServer 是个网络线程启动的模板类
void TCPServer::run()
{
while (!_stopped)
{
Poco::Timespan timeout(250000);
try
{
if (_socket.poll(timeout, Socket::SELECT_READ))
{
try
{
StreamSocket ss = _socket.acceptConnection();
if (!_pConnectionFilter || _pConnectionFilter->accept(ss))
{
// enable nodelay per default: OSX really needs that
#if defined(POCO_OS_FAMILY_UNIX)
if (ss.address().family() != AddressFamily::UNIX_LOCAL)
#endif
{
ss.setNoDelay(true);
}
_pDispatcher->enqueue(ss);
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
// possibly a resource issue since poll() failed;
// give some time to recover before trying again
Poco::Thread::sleep(50);
}
}
}
真正的启动是调用run 方法 。可以看到每次启动之后会将线程放入队列中,后续有个调度线程会不断的从队列中取出线程执行。
这方法在_pDispatcher 所在的类中
void TCPServerDispatcher::run()
{
AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
for (;;)
{
{
ThreadCountWatcher tcw(this);
try
{
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf)
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
beginConnection();
pConnection->start();
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
}
if (_stopped || (_currentThreads > 1 && _queue.empty())) break;
}
}
重点看下_pConnectionFactory->createConnection(pCNf->socket())
很明显是个工厂类根据传入的类调用相应的方法创建相应连接
// TCPHandlerFactory
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override
{
try
{
// 实际创建了一个TCPHandler
return new TCPHandler(server, socket);
}
catch (const Poco::Net::NetException &)
{
...
}
}
tcp Handler 的run 方法
void TCPHandler::run()
{
try
{
runImpl();
LOG_INFO(log, "Done processing connection.");
}
catch (Poco::Exception & e)
{
/// Timeout - not an error.
if (!strcmp(e.what(), "Timeout"))
{
LOG_DEBUG(log, "Poco::Exception. Code: {}, e.code() = {}, e.displayText() = {}, e.what() = {}", ErrorCodes::POCO_EXCEPTION, e.code(), e.displayText(), e.what());
}
else
throw;
}
}
}
继续看runImpl
void TCPHandler::runImpl()
{
setThreadName("TCPHandler");
ThreadStatus thread_status;
connection_context = server.context();
connection_context.makeSessionContext();
/// These timeouts can be changed after receiving query.
auto global_receive_timeout = connection_context.getSettingsRef().receive_timeout;
auto global_send_timeout = connection_context.getSettingsRef().send_timeout;
socket().setReceiveTimeout(global_receive_timeout);
socket().setSendTimeout(global_send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
if (in->eof())
{
LOG_WARNING(log, "Client has not sent any data.");
return;
}
/// User will be authenticated here. It will also set settings from user profile into connection_context.
try
{
receiveHello();
}
catch (const Exception & e) /// Typical for an incorrect username, password, or address.
{
if (e.code() == ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT)
{
LOG_DEBUG(log, "Client has connected to wrong port.");
return;
}
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
{
LOG_WARNING(log, "Client has gone away.");
return;
}
try
{
/// We try to send error information to the client.
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
}
catch (...) {}
throw;
}
/// When connecting, the default database can be specified.
if (!default_database.empty())
{
if (!DatabaseCatalog::instance().isDatabaseExist(default_database))
{
Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_ERROR(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
return;
}
connection_context.setCurrentDatabase(default_database);
}
Settings connection_settings = connection_context.getSettings();
sendHello();
connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
while (true)
{
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
{
Stopwatch idle_time;
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
{
if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
{
LOG_TRACE(log, "Closing idle connection");
return;
}
}
}
/// If we need to shut down, or client disconnects.
if (server.isCancelled() || in->eof())
break;
/// Set context of request.
query_context = connection_context;
Stopwatch watch;
state.reset();
/// Initialized later.
std::optional<CurrentThread::QueryScope> query_scope;
/** An exception during the execution of request (it must be sent over the network to the client).
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
std::optional<DB::Exception> exception;
bool network_error = false;
bool send_exception_with_stack_trace = true;
try
{
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
SCOPE_EXIT({state.timeout_setter.reset();});
/** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
*/
if (!receivePacket())
continue;
query_scope.emplace(*query_context);
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& client_logs_level != LogsLevel::none)
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]{ sendLogs(); });
}
query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
{
if (&context != &*query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
/// Get blocks of temporary tables
readData(connection_settings);
/// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset();
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
state.temporary_tables_read = true;
});
/// Send structure of columns to client for function input()
query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
{
if (&context != &query_context.value())
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
state.need_receive_data_for_input = true;
/// Send ColumnsDescription for input storage.
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
&& query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
sendTableColumns(metadata_snapshot->getColumns());
}
/// Send block to the client - input storage structure.
state.input_header = metadata_snapshot->getSampleBlock();
sendData(state.input_header);
});
query_context->setInputBlocksReaderCallback([&connection_settings, this] (Context & context) -> Block
{
if (&context != &query_context.value())
throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
size_t poll_interval;
int receive_timeout;
std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
if (!readDataNext(poll_interval, receive_timeout))
{
state.block_in.reset();
state.maybe_compressed_in.reset();
return Block();
}
return state.block_for_input;
});
customizeContext(*query_context);
bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
/// Processing Query
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
if (state.io.out)
state.need_receive_data_for_insert = true;
after_check_cancelled.restart();
after_send_progress.restart();
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(connection_settings);
else if (state.need_receive_data_for_input)
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
state.io.onFinish();
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors();
else
processOrdinaryQuery();
/// Do it before sending end of stream, to have a chance to show log message in client.
query_scope->logPeakMemoryUsage();
sendLogs();
sendEndOfStream();
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
state.reset();
query_scope.reset();
}
catch (const Exception & e)
{
state.io.onException();
exception.emplace(e);
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
/// If a timeout occurred, try to inform client about it and close the session
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
network_error = true;
}
catch (const Poco::Net::NetException & e)
{
/** We can get here if there was an error during connection to the client,
* or in connection with a remote server that was used to process the request.
* It is not possible to distinguish between these two cases.
* Although in one of them, we have to send exception to the client, but in the other - we can not.
* We will try to send exception to the client in any case - see below.
*/
state.io.onException();
exception.emplace(Exception::CreateFromPocoTag{}, e);
}
catch (const Poco::Exception & e)
{
state.io.onException();
exception.emplace(Exception::CreateFromPocoTag{}, e);
}
// Server should die on std logic errors in debug, like with assert()
// or ErrorCodes::LOGICAL_ERROR. This helps catch these errors in
// tests.
#ifndef NDEBUG
catch (const std::logic_error & e)
{
state.io.onException();
exception.emplace(Exception::CreateFromSTDTag{}, e);
sendException(*exception, send_exception_with_stack_trace);
std::abort();
}
#endif
catch (const std::exception & e)
{
state.io.onException();
exception.emplace(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state.io.onException();
exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
try
{
if (exception)
{
try
{
/// Try to send logs to client, but it could be risky too
/// Assume that we can't break output here
sendLogs();
}
catch (...)
{
tryLogCurrentException(log, "Can't send logs to client");
}
sendException(*exception, send_exception_with_stack_trace);
}
}
catch (...)
{
/** Could not send exception information to the client. */
network_error = true;
LOG_WARNING(log, "Client has gone away.");
}
try
{
if (exception && !state.temporary_tables_read)
query_context->initializeExternalTablesIfSet();
}
catch (...)
{
network_error = true;
LOG_WARNING(log, "Can't read external tables after query failure.");
}
try
{
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
state.reset();
query_scope.reset();
}
catch (...)
{
/** During the processing of request, there was an exception that we caught and possibly sent to client.
* When destroying the request pipeline execution there was a second exception.
* For example, a pipeline could run in multiple threads, and an exception could occur in each of them.
* Ignore it.
*/
}
watch.stop();
LOG_INFO(log, "Processed in {} sec.", watch.elapsedSeconds());
/// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
query_context.reset();
if (network_error)
break;
}
}
runImpl 完成的主要做的几件事
1 建立握手连接
2 接收sql,检查sql 的合法性
3 执行sql 请求生成执行计划
4 输出执行结果
下一篇我们将着重介绍sql的处理。
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Mar 21, 2022 at 05:03 pm