clickhouse代码及原理之server启动流程
in bigdata with 0 comment

clickhouse代码及原理之server启动流程

in bigdata with 0 comment

clickhouse

clickhouse 的启动的过程

捕获.PNG

clickhouse-server 启动的主要文件在programsserverclickhouse-server.cpp 的main 方法中。

int mainEntryClickHouseServer(int argc, char ** argv)
{
    DB::Server app;
    try
    {
        return app.run(argc, argv);
    }
    catch (...)
    {
        std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
        auto code = DB::getCurrentExceptionCode();
        return code ? code : 1;
    }
}

看看DB::Server 的方法
server 的run 方法

int Server::run()
{
    if (config().hasOption("help"))
    {
        Poco::Util::HelpFormatter help_formatter(Server::options());
        std::stringstream header;
        header << commandName() << " [OPTION] [-- [ARG]...]\n";
        header << "positional arguments can be used to rewrite config.xml properties, for example, --http_port=8010";
        help_formatter.setHeader(header.str());
        help_formatter.format(std::cout);
        return 0;
    }
    if (config().hasOption("version"))
    {
        std::cout << DBMS_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
        return 0;
    }
    return Application::run(); // NOLINT
}

很显然是Application 的run

int Application::run()
{
    int rc = EXIT_CONFIG;
    initialize(*this);

    try
    {
        rc = EXIT_SOFTWARE;
        rc = main(_unprocessedArgs);
    }
    catch (Poco::Exception& exc)
    {
        logger().log(exc);
    }
    catch (std::exception& exc)
    {
        logger().error(exc.what());
    }
    catch (...)
    {
        logger().fatal("system exception");
    }

    uninitialize();
    return rc;
}

可以看到里边有个main 方法

int Application::main(const ArgVec& args)
{
    return EXIT_OK;
}

诡异的是只有个exit_ok的方法,所以这个肯定不是真正的方法。
class Server : public BaseDaemon, public IServer
class BaseDaemon : public Poco::Util::ServerApplication, public Loggers
class Util_API ServerApplication: public Application
Server -> ServerApplication -> Application
所以Server的main方法覆盖了父类Application的main 方法才是真正启动的main 方法。

int Server::main(const std::vector<std::string> & /*args*/)
{
    Poco::Logger * log = &logger();
    UseSSL use_ssl;

    ThreadStatus thread_status;
    # 注册函数方法和引擎
    registerFunctions();
    registerAggregateFunctions();
    registerTableFunctions();
    registerStorages();
    registerDictionaries();
    registerDisks();

......
        for (const auto & listen_host : listen_hosts)
        {
            auto create_server = [&](const char * port_name, auto && func)
            {
                /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
                if (!config().has(port_name))
                    return;

                auto port = config().getInt(port_name);
                try
                {
                    func(port);
                }
                catch (const Poco::Exception &)
                {
                    std::string message = "Listen [" + listen_host + "]:" + std::to_string(port) + " failed: " + getCurrentExceptionMessage(false);

                    if (listen_try)
                    {
                        LOG_WARNING(log, "{}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
                            "specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
                            "file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
                            " Example for disabled IPv4: <listen_host>::</listen_host>",
                            message);
                    }
                    else
                    {
                        throw Exception{message, ErrorCodes::NETWORK_ERROR};
                    }
                }
            };

            /// HTTP
            create_server("http_port", [&](UInt16 port)
            {
                Poco::Net::ServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port);
                socket.setReceiveTimeout(settings.http_receive_timeout);
                socket.setSendTimeout(settings.http_send_timeout);

                servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
                    createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));

                LOG_INFO(log, "Listening for http://{}", address.toString());
            });

            /// HTTPS
            create_server("https_port", [&](UInt16 port)
            {
#if USE_SSL
                Poco::Net::SecureServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
                socket.setReceiveTimeout(settings.http_receive_timeout);
                socket.setSendTimeout(settings.http_send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
                    createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));

                LOG_INFO(log, "Listening for https://{}", address.toString());
#else
                UNUSED(port);
                throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.",
                    ErrorCodes::SUPPORT_IS_DISABLED};
#endif
            });

            /// TCP
            create_server("tcp_port", [&](UInt16 port)
            {
                Poco::Net::ServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port);
                socket.setReceiveTimeout(settings.receive_timeout);
                socket.setSendTimeout(settings.send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
                    new TCPHandlerFactory(*this),
                    server_pool,
                    socket,
                    new Poco::Net::TCPServerParams));

                LOG_INFO(log, "Listening for connections with native protocol (tcp): {}", address.toString());
            });

            /// TCP with SSL
            create_server("tcp_port_secure", [&](UInt16 port)
            {
#if USE_SSL
                Poco::Net::SecureServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
                socket.setReceiveTimeout(settings.receive_timeout);
                socket.setSendTimeout(settings.send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
                    new TCPHandlerFactory(*this, /* secure= */ true),
                    server_pool,
                    socket,
                    new Poco::Net::TCPServerParams));
                LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): {}", address.toString());
#else
                UNUSED(port);
                throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
                    ErrorCodes::SUPPORT_IS_DISABLED};
#endif
            });

            /// Interserver IO HTTP
            create_server("interserver_http_port", [&](UInt16 port)
            {
                Poco::Net::ServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port);
                socket.setReceiveTimeout(settings.http_receive_timeout);
                socket.setSendTimeout(settings.http_send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
                    createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));

                LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
            });

            create_server("interserver_https_port", [&](UInt16 port)
            {
#if USE_SSL
                Poco::Net::SecureServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
                socket.setReceiveTimeout(settings.http_receive_timeout);
                socket.setSendTimeout(settings.http_send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
                    createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));

                LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
#else
                UNUSED(port);
                throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
                        ErrorCodes::SUPPORT_IS_DISABLED};
#endif
            });

            create_server("mysql_port", [&](UInt16 port)
            {
                Poco::Net::ServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
                socket.setReceiveTimeout(Poco::Timespan());
                socket.setSendTimeout(settings.send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
                    new MySQLHandlerFactory(*this),
                    server_pool,
                    socket,
                    new Poco::Net::TCPServerParams));

                LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString());
            });

            create_server("postgresql_port", [&](UInt16 port)
            {
                Poco::Net::ServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
                socket.setReceiveTimeout(Poco::Timespan());
                socket.setSendTimeout(settings.send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
                    new PostgreSQLHandlerFactory(*this),
                    server_pool,
                    socket,
                    new Poco::Net::TCPServerParams));

                LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString());
            });

            /// Prometheus (if defined and not setup yet with http_port)
            create_server("prometheus.port", [&](UInt16 port)
            {
                Poco::Net::ServerSocket socket;
                auto address = socket_bind_listen(socket, listen_host, port);
                socket.setReceiveTimeout(settings.http_receive_timeout);
                socket.setSendTimeout(settings.http_send_timeout);
                servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
                    createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));

                LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
            });
        }

        if (servers.empty())
             throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                ErrorCodes::NO_ELEMENTS_IN_CONFIG);

        global_context->enableNamedSessions();

        for (auto & server : servers)
            server->start();

        {
            String level_str = config().getString("text_log.level", "");
            int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
            setTextLog(global_context->getTextLog(), level);
        }
        buildLoggers(config(), logger());

        main_config_reloader->start();
        access_control.startPeriodicReloadingUsersConfigs();
        if (dns_cache_updater)
            dns_cache_updater->start();

        {
            LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",
                formatReadableSizeWithBinarySuffix(memory_amount),
                getNumberOfPhysicalCPUCores(),  // on ARM processors it can show only enabled at current moment cores
                std::thread::hardware_concurrency());
        }

        LOG_INFO(log, "Ready for connections.");

        SCOPE_EXIT({
            LOG_DEBUG(log, "Received termination signal.");
            LOG_DEBUG(log, "Waiting for current connections to close.");

            is_cancelled = true;

            int current_connections = 0;
            for (auto & server : servers)
            {
                server->stop();
                current_connections += server->currentConnections();
            }

            if (current_connections)
                LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
            else
                LOG_INFO(log, "Closed all listening sockets.");

            /// Killing remaining queries.
            global_context->getProcessList().killAllQueries();

            if (current_connections)
            {
                const int sleep_max_ms = 1000 * config().getInt("shutdown_wait_unfinished", 5);
                const int sleep_one_ms = 100;
                int sleep_current_ms = 0;
                while (sleep_current_ms < sleep_max_ms)
                {
                    current_connections = 0;
                    for (auto & server : servers)
                        current_connections += server->currentConnections();
                    if (!current_connections)
                        break;
                    sleep_current_ms += sleep_one_ms;
                    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_one_ms));
                }
            }

            if (current_connections)
                LOG_INFO(log, "Closed connections. But {} remain."
                    " Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>", current_connections);
            else
                LOG_INFO(log, "Closed connections.");

            dns_cache_updater.reset();
            main_config_reloader.reset();

            if (current_connections)
            {
                /// There is no better way to force connections to close in Poco.
                /// Otherwise connection handlers will continue to live
                /// (they are effectively dangling objects, but they use global thread pool
                ///  and global thread pool destructor will wait for threads, preventing server shutdown).

                /// Dump coverage here, because std::atexit callback would not be called.
                dumpCoverageReportIfPossible();
                LOG_INFO(log, "Will shutdown forcefully.");
                _exit(Application::EXIT_OK);
            }
        });

        /// try to load dictionaries immediately, throw on error and die
        ext::scope_guard dictionaries_xmls, models_xmls;
        try
        {
            if (!config().getBool("dictionaries_lazy_load", true))
            {
                global_context->tryCreateEmbeddedDictionaries();
                global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
            }
            dictionaries_xmls = global_context->getExternalDictionariesLoader().addConfigRepository(
                std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config"));
            models_xmls = global_context->getExternalModelsLoader().addConfigRepository(
                std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "models_config"));
        }
        catch (...)
        {
            LOG_ERROR(log, "Caught exception while loading dictionaries.");
            throw;
        }

        std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
        for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
        {
            metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(
                global_context->getConfigRef(), graphite_key, async_metrics));
        }

        waitForTerminationRequest();
    }

    return Application::EXIT_OK;
}

可以看到 开始是注册各种函数和引擎,接着根据我们的配置文件初始化相关的文件配置,最后的开始启动TCP和http等服务。
创建完成后再逐个启动server

   for (auto & server : servers)
            server->start();
->
//调用的是tcpServer的模板方法
void TCPServer::start()
{
    poco_assert (_stopped);

    _stopped = false;
    _thread.start(*this);
}

TCPServer 是个网络线程启动的模板类

void TCPServer::run()
{
    while (!_stopped)
    {
        Poco::Timespan timeout(250000);
        try
        {
            if (_socket.poll(timeout, Socket::SELECT_READ))
            {
                try
                {
                    StreamSocket ss = _socket.acceptConnection();
                    
                    if (!_pConnectionFilter || _pConnectionFilter->accept(ss))
                    {
                        // enable nodelay per default: OSX really needs that
#if defined(POCO_OS_FAMILY_UNIX)
                        if (ss.address().family() != AddressFamily::UNIX_LOCAL)
#endif
                        {
                            ss.setNoDelay(true);
                        }
                        _pDispatcher->enqueue(ss);
                    }
                }
                catch (Poco::Exception& exc)
                {
                    ErrorHandler::handle(exc);
                }
                catch (std::exception& exc)
                {
                    ErrorHandler::handle(exc);
                }
                catch (...)
                {
                    ErrorHandler::handle();
                }
            }
        }
        catch (Poco::Exception& exc)
        {
            ErrorHandler::handle(exc);
            // possibly a resource issue since poll() failed;
            // give some time to recover before trying again
            Poco::Thread::sleep(50); 
        }
    }
}

真正的启动是调用run 方法 。可以看到每次启动之后会将线程放入队列中,后续有个调度线程会不断的从队列中取出线程执行。
这方法在_pDispatcher 所在的类中

void TCPServerDispatcher::run()
{
    AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive

    int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();

    for (;;)
    {
        {
            ThreadCountWatcher tcw(this);
            try
            {
                AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
                if (pNf)
                {
                    TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
                    if (pCNf)
                    {
                        std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
                        poco_check_ptr(pConnection.get());
                        beginConnection();
                        pConnection->start();
                        endConnection();
                    }
                }
            }
            catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
            catch (std::exception &exc)  { ErrorHandler::handle(exc); }
            catch (...)                  { ErrorHandler::handle();    }
        }
        if (_stopped || (_currentThreads > 1 && _queue.empty())) break;
    }
}

重点看下_pConnectionFactory->createConnection(pCNf->socket())
很明显是个工厂类根据传入的类调用相应的方法创建相应连接

// TCPHandlerFactory
 Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override
 {
     try
     {
       // 实际创建了一个TCPHandler
         return new TCPHandler(server, socket);
     }
     catch (const Poco::Net::NetException &)
     {
            ...
     }
 }

tcp Handler 的run 方法

void TCPHandler::run()
{
    try
    {
        runImpl();

        LOG_INFO(log, "Done processing connection.");
    }
    catch (Poco::Exception & e)
    {
        /// Timeout - not an error.
        if (!strcmp(e.what(), "Timeout"))
        {
            LOG_DEBUG(log, "Poco::Exception. Code: {}, e.code() = {}, e.displayText() = {}, e.what() = {}", ErrorCodes::POCO_EXCEPTION, e.code(), e.displayText(), e.what());
        }
        else
            throw;
    }
}

}

继续看runImpl

void TCPHandler::runImpl()
{
    setThreadName("TCPHandler");
    ThreadStatus thread_status;

    connection_context = server.context();
    connection_context.makeSessionContext();

    /// These timeouts can be changed after receiving query.

    auto global_receive_timeout = connection_context.getSettingsRef().receive_timeout;
    auto global_send_timeout = connection_context.getSettingsRef().send_timeout;

    socket().setReceiveTimeout(global_receive_timeout);
    socket().setSendTimeout(global_send_timeout);
    socket().setNoDelay(true);

    in = std::make_shared<ReadBufferFromPocoSocket>(socket());
    out = std::make_shared<WriteBufferFromPocoSocket>(socket());

    if (in->eof())
    {
        LOG_WARNING(log, "Client has not sent any data.");
        return;
    }

    /// User will be authenticated here. It will also set settings from user profile into connection_context.
    try
    {
        receiveHello();
    }
    catch (const Exception & e) /// Typical for an incorrect username, password, or address.
    {
        if (e.code() == ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT)
        {
            LOG_DEBUG(log, "Client has connected to wrong port.");
            return;
        }

        if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
        {
            LOG_WARNING(log, "Client has gone away.");
            return;
        }

        try
        {
            /// We try to send error information to the client.
            sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
        }
        catch (...) {}

        throw;
    }

    /// When connecting, the default database can be specified.
    if (!default_database.empty())
    {
        if (!DatabaseCatalog::instance().isDatabaseExist(default_database))
        {
            Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
            LOG_ERROR(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
            sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
            return;
        }

        connection_context.setCurrentDatabase(default_database);
    }

    Settings connection_settings = connection_context.getSettings();

    sendHello();

    connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });

    while (true)
    {
        /// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
        {
            Stopwatch idle_time;
            while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
                std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
            {
                if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
                {
                    LOG_TRACE(log, "Closing idle connection");
                    return;
                }
            }
        }

        /// If we need to shut down, or client disconnects.
        if (server.isCancelled() || in->eof())
            break;

        /// Set context of request.
        query_context = connection_context;

        Stopwatch watch;
        state.reset();

        /// Initialized later.
        std::optional<CurrentThread::QueryScope> query_scope;

        /** An exception during the execution of request (it must be sent over the network to the client).
         *  The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
         */
        std::optional<DB::Exception> exception;
        bool network_error = false;

        bool send_exception_with_stack_trace = true;

        try
        {
            /// If a user passed query-local timeouts, reset socket to initial state at the end of the query
            SCOPE_EXIT({state.timeout_setter.reset();});

            /** If Query - process it. If Ping or Cancel - go back to the beginning.
             *  There may come settings for a separate query that modify `query_context`.
             */
            if (!receivePacket())
                continue;

            query_scope.emplace(*query_context);

            send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;

            /// Should we send internal logs to client?
            const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
            if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
                && client_logs_level != LogsLevel::none)
            {
                state.logs_queue = std::make_shared<InternalTextLogsQueue>();
                state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
                CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
                CurrentThread::setFatalErrorCallback([this]{ sendLogs(); });
            }

            query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
            {
                if (&context != &*query_context)
                    throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);

                /// Get blocks of temporary tables
                readData(connection_settings);

                /// Reset the input stream, as we received an empty block while receiving external table data.
                /// So, the stream has been marked as cancelled and we can't read from it anymore.
                state.block_in.reset();
                state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.

                state.temporary_tables_read = true;
            });

            /// Send structure of columns to client for function input()
            query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
            {
                if (&context != &query_context.value())
                    throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);

                auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
                state.need_receive_data_for_input = true;

                /// Send ColumnsDescription for input storage.
                if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
                    && query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
                {
                    sendTableColumns(metadata_snapshot->getColumns());
                }

                /// Send block to the client - input storage structure.
                state.input_header = metadata_snapshot->getSampleBlock();
                sendData(state.input_header);
            });

            query_context->setInputBlocksReaderCallback([&connection_settings, this] (Context & context) -> Block
            {
                if (&context != &query_context.value())
                    throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);

                size_t poll_interval;
                int receive_timeout;
                std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
                if (!readDataNext(poll_interval, receive_timeout))
                {
                    state.block_in.reset();
                    state.maybe_compressed_in.reset();
                    return Block();
                }
                return state.block_for_input;
            });

            customizeContext(*query_context);

            bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
            /// Processing Query
            state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);

            if (state.io.out)
                state.need_receive_data_for_insert = true;

            after_check_cancelled.restart();
            after_send_progress.restart();

            /// Does the request require receive data from client?
            if (state.need_receive_data_for_insert)
                processInsertQuery(connection_settings);
            else if (state.need_receive_data_for_input)
            {
                /// It is special case for input(), all works for reading data from client will be done in callbacks.
                auto executor = state.io.pipeline.execute();
                executor->execute(state.io.pipeline.getNumThreads());
                state.io.onFinish();
            }
            else if (state.io.pipeline.initialized())
                processOrdinaryQueryWithProcessors();
            else
                processOrdinaryQuery();

            /// Do it before sending end of stream, to have a chance to show log message in client.
            query_scope->logPeakMemoryUsage();

            sendLogs();
            sendEndOfStream();

            /// QueryState should be cleared before QueryScope, since otherwise
            /// the MemoryTracker will be wrong for possible deallocations.
            /// (i.e. deallocations from the Aggregator with two-level aggregation)
            state.reset();
            query_scope.reset();
        }
        catch (const Exception & e)
        {
            state.io.onException();
            exception.emplace(e);

            if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
                throw;

            /// If a timeout occurred, try to inform client about it and close the session
            if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
                network_error = true;
        }
        catch (const Poco::Net::NetException & e)
        {
            /** We can get here if there was an error during connection to the client,
             *  or in connection with a remote server that was used to process the request.
             *  It is not possible to distinguish between these two cases.
             *  Although in one of them, we have to send exception to the client, but in the other - we can not.
             *  We will try to send exception to the client in any case - see below.
             */
            state.io.onException();
            exception.emplace(Exception::CreateFromPocoTag{}, e);
        }
        catch (const Poco::Exception & e)
        {
            state.io.onException();
            exception.emplace(Exception::CreateFromPocoTag{}, e);
        }
// Server should die on std logic errors in debug, like with assert()
// or ErrorCodes::LOGICAL_ERROR. This helps catch these errors in
// tests.
#ifndef NDEBUG
        catch (const std::logic_error & e)
        {
            state.io.onException();
            exception.emplace(Exception::CreateFromSTDTag{}, e);
            sendException(*exception, send_exception_with_stack_trace);
            std::abort();
        }
#endif
        catch (const std::exception & e)
        {
            state.io.onException();
            exception.emplace(Exception::CreateFromSTDTag{}, e);
        }
        catch (...)
        {
            state.io.onException();
            exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
        }

        try
        {
            if (exception)
            {
                try
                {
                    /// Try to send logs to client, but it could be risky too
                    /// Assume that we can't break output here
                    sendLogs();
                }
                catch (...)
                {
                    tryLogCurrentException(log, "Can't send logs to client");
                }

                sendException(*exception, send_exception_with_stack_trace);
            }
        }
        catch (...)
        {
            /** Could not send exception information to the client. */
            network_error = true;
            LOG_WARNING(log, "Client has gone away.");
        }

        try
        {
            if (exception && !state.temporary_tables_read)
                query_context->initializeExternalTablesIfSet();
        }
        catch (...)
        {
            network_error = true;
            LOG_WARNING(log, "Can't read external tables after query failure.");
        }


        try
        {
            /// QueryState should be cleared before QueryScope, since otherwise
            /// the MemoryTracker will be wrong for possible deallocations.
            /// (i.e. deallocations from the Aggregator with two-level aggregation)
            state.reset();
            query_scope.reset();
        }
        catch (...)
        {
            /** During the processing of request, there was an exception that we caught and possibly sent to client.
             *  When destroying the request pipeline execution there was a second exception.
             *  For example, a pipeline could run in multiple threads, and an exception could occur in each of them.
             *  Ignore it.
             */
        }

        watch.stop();

        LOG_INFO(log, "Processed in {} sec.", watch.elapsedSeconds());

        /// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
        query_context.reset();

        if (network_error)
            break;
    }
}

runImpl 完成的主要做的几件事
1 建立握手连接
2 接收sql,检查sql 的合法性
3 执行sql 请求生成执行计划
4 输出执行结果
下一篇我们将着重介绍sql的处理。

Responses