c++ - Boost.Asio: Is it a good thing to use a `io_service` per connection/socket? -


i want create application implements one-thread-per-connection model. each connection must stoppable. have tried this boost.asio example implements blocking version of want. after little bit questioning i've found out there no reliable way stop session of example. i've tried implement own. had use asynchronous functions. since want make thread manage 1 connection , there no way control asynchronous job employed thread, decided use io_service each connection/socket/thread.

so approach, know better approach?

my code here can examine , review it:

#include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/array.hpp> #include <boost/thread.hpp> #include <boost/scoped_ptr.hpp> #include <list> #include <iostream> #include <string> #include <istream>  namespace ba = boost::asio; namespace bs = boost::system; namespace b  = boost;  typedef ba::ip::tcp::acceptor acceptor_type; typedef ba::ip::tcp::socket socket_type;  const short port = 11235; class server;  // connection has own io_service , socket class connection { protected:     ba::io_service service;     socket_type sock;     b::thread *thread;     ba::streambuf stream_buffer;    // reading etc     server *server;     void asyncreadstring() {         ba::async_read_until(             sock,             stream_buffer,             '\0',   // null-char delimiter             b::bind(&connection::readhandler, this,                 ba::placeholders::error,                 ba::placeholders::bytes_transferred));     }     void asyncwritestring(const std::string &s) {         std::string newstr = s + '\0';  // add null char         ba::async_write(             sock,             ba::buffer(newstr.c_str(), newstr.size()),             b::bind(&connection::writehandler, this,                 ba::placeholders::error,                 ba::placeholders::bytes_transferred));     }     virtual void session() {         asyncreadstring();         service.run();  // run @ last     }     std::string extractstring() {         std::istream is(&stream_buffer);         std::string s;         std::getline(is, s, '\0');         return s;     }     virtual void readhandler(         const bs::error_code &ec,         std::size_t bytes_transferred) {         if (!ec) {             std::cout << (extractstring() + "\n");             std::cout.flush();             asyncreadstring();  // read again         }         else {             // nothing, "this" deleted later         }     }     virtual void writehandler(         const bs::error_code &ec,         std::size_t bytes_transferred) {     } public:     connection(server *s) :         service(),         sock(service),         server(s),         thread(null)     {  }     socket_type& socket() {         return sock;     }     void start() {         if (thread) delete thread;         thread = new b::thread(             b::bind(&connection::session, this));     }     void join() {         if (thread) thread->join();     }     void stop() {         service.stop();     }     void killme();     virtual ~connection() {     } };  // server has own io_service it's used accepting class server { public:     std::list<connection*> connections; protected:     ba::io_service service;     acceptor_type acc;     b::thread *thread;     virtual void accepthandler(const bs::error_code &ec) {         if (!ec) {             connections.back()->start();             connections.push_back(new connection(this));             acc.async_accept(                 connections.back()->socket(),                 b::bind(&server::accepthandler,                     this,                     ba::placeholders::error));         }         else {             // nothing             // since new session deleted             // automatically destructor         }     }     virtual void threadfunc() {         connections.push_back(new connection(this));         acc.async_accept(             connections.back()->socket(),             b::bind(&server::accepthandler,                 this,                 ba::placeholders::error));         service.run();     } public:     server():         service(),         acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), port)),         thread(null)     {  }     void start() {         if (thread) delete thread;         thread = new b::thread(             b::bind(&server::threadfunc, this));     }     void stop() {         service.stop();     }     void join() {         if (thread) thread->join();     }     void stopallconnections() {         (auto c : connections) {             c->stop();         }     }     void joinallconnections() {         (auto c : connections) {             c->join();         }     }     void killallconnections() {         (auto c : connections) {             delete c;         }         connections.clear();     }     void killconnection(connection *c) {         connections.remove(c);         delete c;     }     virtual ~server() {         delete thread;         // connection should deleted user (?)     } };  void connection::killme() {     server->killconnection(this); }  int main() {     try {         server s;         s.start();         std::cin.get(); // wait enter         s.stop();   // stop listening first         s.stopallconnections(); // interrupt ongoing connections         s.join();   // wait server, should return         s.joinallconnections(); // wait ongoing connections         s.killallconnections(); // destroy connection objects         // @ end of scope, server destroyed     }     catch (std::exception &e) {         std::cerr << "exception: " << e.what() << std::endl;         return 1;     }     return 0; } 

no. using io_service object per connection smell. since you're running each connection on dedicated thread.

at point have ask did asynchrony buy you? can have code synchronous , have same number of threads etc.

clearly want multiplex connections onto far smaller number of services. in practice there few sensible models

  1. a single io_service single service thread (this good). no tasks queued on service may ever block significant time or latency suffer

  2. a single io_service number of threads executing handlers. number of threads in pool should enough service max. number of simultaneous cpu intensive tasks supported (or again, latency start go up)

  3. an io_service per thread, 1 thread per logical core , thread affinity "sticks" core. can ideal cache locality

update: demo

here's demo shows idiomatic style using option 1. above:

live on coliru

#include <boost/array.hpp> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/make_shared.hpp> #include <boost/thread.hpp> #include <iostream> #include <istream> #include <list> #include <string>  namespace ba = boost::asio; namespace bs = boost::system; namespace b  = boost;  typedef ba::ip::tcp::acceptor acceptor_type; typedef ba::ip::tcp::socket   socket_type;  const short port = 11235;  // connection has own io_service , socket class connection : public b::enable_shared_from_this<connection> { public:     typedef boost::shared_ptr<connection> ptr; protected:     socket_type    sock;     ba::streambuf  stream_buffer; // reading etc     std::string    message;      void asyncreadstring() {         std::cout << __pretty_function__ << "\n";          ba::async_read_until(             sock,             stream_buffer,             '\0',   // null-char delimiter             b::bind(&connection::readhandler, shared_from_this(),                 ba::placeholders::error,                 ba::placeholders::bytes_transferred));     }     void asyncwritestring(const std::string &s) {         std::cout << __pretty_function__ << "\n";          message = s;          ba::async_write(             sock,             ba::buffer(message.c_str(), message.size()+1),             b::bind(&connection::writehandler, shared_from_this(),                 ba::placeholders::error,                 ba::placeholders::bytes_transferred));     }     std::string extractstring() {         std::cout << __pretty_function__ << "\n";          std::istream is(&stream_buffer);         std::string s;         std::getline(is, s, '\0');         return s;     }     void readhandler(         const bs::error_code &ec,         std::size_t bytes_transferred)      {         std::cout << __pretty_function__ << "\n";          if (!ec) {             std::cout << (extractstring() + "\n");             std::cout.flush();             asyncreadstring();  // read again         }         else {             // nothing, "this" deleted later         }     }     void writehandler(const bs::error_code &ec, std::size_t bytes_transferred) {         std::cout << __pretty_function__ << "\n";     } public:     connection(ba::io_service& svc) : sock(svc) { }      virtual ~connection() {         std::cout << __pretty_function__ << "\n";     }      socket_type& socket() { return sock;          }      void session()        { asyncreadstring();    }      void stop()           { sock.cancel();        } };  // server has own io_service it's used accepting class server { public:     std::list<boost::weak_ptr<connection> > m_connections; protected:     ba::io_service _service;     boost::optional<ba::io_service::work> _work;     acceptor_type _acc;     b::thread thread;      void accepthandler(const bs::error_code &ec, connection::ptr accepted) {         if (!ec) {             accepted->session();             doaccept();         }         else {             // nothing new session deleted automatically             // destructor         }     }      void doaccept() {         auto newaccept = boost::make_shared<connection>(_service);          _acc.async_accept(             newaccept->socket(),             b::bind(&server::accepthandler,                 this,                 ba::placeholders::error,                 newaccept             ));     }  public:     server():         _service(),         _work(ba::io_service::work(_service)),         _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), port)),         thread(b::bind(&ba::io_service::run, &_service))     {  }      ~server() {         std::cout << __pretty_function__ << "\n";         stop();         _work.reset();         if (thread.joinable()) thread.join();     }      void start() {         std::cout << __pretty_function__ << "\n";         doaccept();     }      void stop() {         std::cout << __pretty_function__ << "\n";         _acc.cancel();     }      void stopallconnections() {         std::cout << __pretty_function__ << "\n";         (auto c : m_connections) {             if (auto p = c.lock())                 p->stop();         }     } };  int main() {     try {         server s;         s.start();          std::cerr << "shutdown in 2 seconds...\n";         b::this_thread::sleep_for(b::chrono::seconds(2));          std::cerr << "stop accepting...\n";         s.stop();          std::cerr << "shutdown...\n";         s.stopallconnections(); // interrupt ongoing connections     } // destructor of server join service thread     catch (std::exception &e) {         std::cerr << __function__ << ":" << __line__ << "\n";         std::cerr << "exception: " << e.what() << std::endl;         return 1;     }      std::cerr << "byebye\n"; } 

i modified main() run 2 seconds without user intervention. can demo live on coliru (of course, it's limited w.r.t number of client processes).

if run lot (a lot) of clients, using e.g.

$ time (for in {1..1000}; (sleep 1.$random; echo -e "hello world $random\\0" | netcat localhost 11235)& done; wait) 

you find 2 second window handles them all:

$ ./test | sort | uniq -c | sort -n | tail shutdown in 2 seconds... shutdown... byebye       2 hello world 28214       2 hello world 4554       2 hello world 6216       2 hello world 7864       2 hello world 9966       2 void server::stop()    1000 std::string connection::extractstring()    1001 virtual connection::~connection()    2000 void connection::asyncreadstring()    2000 void connection::readhandler(const boost::system::error_code&, std::size_t) 

if go berserk , raise 1000 e.g. 100000 there, you'll things similar to:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail shutdown in 2 seconds... shutdown... byebye       2 hello world 5483       2 hello world 579       2 hello world 5865       2 hello world 938       2 void server::stop()       3 hello world 9613    1741 std::string connection::extractstring()    1742 virtual connection::~connection()    3482 void connection::asyncreadstring()    3482 void connection::readhandler(const boost::system::error_code&, std::size_t) 

on repeated 2-second runs of server.


Comments

Popular posts from this blog

qt - Using float or double for own QML classes -

Create Outlook appointment via C# .Net -

ios - Swift Array Resetting Itself -