libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #include <cstdio> 00029 #include <set> 00030 #include <queue> 00031 #include <string> 00032 #include <stdexcept> 00033 #include <boost/asio.hpp> 00034 #include <boost/bind.hpp> 00035 #include <boost/enable_shared_from_this.hpp> 00036 #include <boost/noncopyable.hpp> 00037 #include <boost/foreach.hpp> 00038 #include <boost/algorithm/string/trim.hpp> 00039 #include <boost/thread.hpp> 00040 #include <boost/date_time/posix_time/posix_time.hpp> 00041 00042 #include "../../include/moost/terminal_format.hpp" 00043 #include "../../include/moost/utils/foreach.hpp" 00044 #include "../../include/moost/io/helper.hpp" 00045 #include "../../include/moost/io/async_stream_forwarder.hpp" 00046 #include "../../include/moost/service/remote_shell.h" 00047 #include "../../include/moost/service/posix_stream_stealer.h" 00048 00049 #if defined(_POSIX_SOURCE) || defined(__CYGWIN__) 00050 // needed for ::dup(), ::write(), ::fileno() 00051 # include <unistd.h> 00052 #elif defined(_WIN32) 00053 # include <windows.h> 00054 #else 00055 # error "apparently no local shell support has been added for this platform" 00056 #endif 00057 00058 /* 00059 * TODO: cleanup stdstream/dup/fileno/read/write/pipe stuff 00060 */ 00061 00062 using boost::asio::ip::tcp; 00063 using namespace boost; 00064 using namespace moost; 00065 using namespace moost::service; 00066 00067 class session_base; 00068 typedef shared_ptr<session_base> session_ptr; 00069 typedef shared_ptr<tcp::socket> socket_ptr; 00070 typedef void (session_base::*session_meth)(const char *buffer, size_t count); 00071 00072 namespace moost { namespace service { 00073 00074 class remote_shell_server_impl : public noncopyable 00075 { 00076 private: 00077 struct noop_pre_shutdown_func 00078 { 00079 void operator()() 00080 { 00081 } 00082 }; 00083 00084 struct command 00085 { 00086 session_ptr session; 00087 std::string cmd; 00088 std::string args; 00089 00090 command() 00091 {} 00092 00093 command(const session_ptr& s, const std::string& c, const std::string& a) 00094 : session(s), cmd(c), args(a) 00095 {} 00096 }; 00097 00098 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00099 typedef asio::posix::basic_stream_descriptor<> stdstream; 00100 #endif 00101 00102 void handle_accept(socket_ptr socket, remote_shell_iface *rsi, const system::error_code& error); 00103 void handle_stop(const std::string& msg); 00104 void accept_session(remote_shell_iface *rsi); 00105 void pre_shutdown(); 00106 00107 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00108 bool snoop_stdio(posix_stream_stealer& stealer, FILE *stream, stdstream *bsd, session_meth cb); 00109 void on_stdio_read(stdstream *bsd, session_meth cb, const system::error_code& error, char *buffer, size_t count); 00110 void stdio_read_more(stdstream *bsd, session_meth cb, char *buffer); 00111 #endif 00112 00113 bool setup_stdio_snoopers(); 00114 void teardown_stdio_snoopers(); 00115 00116 bool create_console_session(remote_shell_iface *rsi, session_ptr& new_session, stream_writer_ptr& writer); 00117 bool setup_console_session(remote_shell_iface *rsi); 00118 00119 void command_thread(remote_shell_iface *rsi); 00120 00121 shared_ptr<asio::io_service> m_ios; 00122 shared_ptr<tcp::acceptor> m_acceptor; 00123 std::set<session_ptr> m_sessions; 00124 appender_factory_ptr m_app_factory; 00125 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00126 stdstream m_stdout; 00127 stdstream m_stderr; 00128 posix_stream_stealer m_stdout_stealer; 00129 posix_stream_stealer m_stderr_stealer; 00130 #endif 00131 std::string m_welcome; 00132 bool m_default_stdout_state; 00133 bool m_default_stderr_state; 00134 bool m_enable_local; 00135 bool m_can_snoop_stdio; 00136 unsigned short m_listen_port; 00137 shared_ptr<thread> m_cmd_runner; 00138 std::queue< command > m_cmd_queue; 00139 mutex m_cmd_queue_mutex; 00140 condition_variable m_cmd_queue_cond; 00141 boost::function0<void> m_pre_shutdown_func; 00142 00143 public: 00144 remote_shell_server_impl(shared_ptr<asio::io_service> ios); 00145 ~remote_shell_server_impl(); 00146 void run(remote_shell_iface *rsi); 00147 void stop(const std::string& msg); 00148 void remove_session(session_ptr p); 00149 void set_appender_factory(appender_factory_ptr app_factory); 00150 void set_default_stdout_state(bool enabled); 00151 void set_default_stderr_state(bool enabled); 00152 void set_listen_port(unsigned short port); 00153 void set_pre_shutdown_function(boost::function0<void>& func); 00154 void enable_local_shell(bool enabled); 00155 void get_sessions_list(std::string& rv); 00156 void process_command(session_ptr session, const std::string& cmd, const std::string& args); 00157 00158 const std::string& welcome() const 00159 { 00160 return m_welcome; 00161 } 00162 }; 00163 00164 } } 00165 00166 class session_io_socket 00167 { 00168 public: 00169 session_io_socket(socket_ptr socket) 00170 : m_socket(socket) 00171 { 00172 } 00173 00174 void set_nodelay() 00175 { 00176 m_socket->set_option(tcp::no_delay(true)); 00177 } 00178 00179 std::string get_peer_string() const 00180 { 00181 std::ostringstream oss; 00182 oss << m_socket->remote_endpoint(); 00183 return oss.str(); 00184 } 00185 00186 void close() 00187 { 00188 mutex::scoped_lock lock(m_mutex); 00189 m_socket->close(); 00190 } 00191 00192 template <typename HandlerT> 00193 void write_stdout(const std::string& data, HandlerT handler) 00194 { 00195 mutex::scoped_lock lock(m_mutex); 00196 asio::async_write(*m_socket, asio::buffer(data.c_str(), data.length()), handler); 00197 } 00198 00199 template <typename HandlerT> 00200 void write_stderr(const std::string& data, HandlerT handler) 00201 { 00202 mutex::scoped_lock lock(m_mutex); 00203 asio::async_write(*m_socket, asio::buffer(data.c_str(), data.length()), handler); 00204 } 00205 00206 template <typename HandlerT> 00207 void read_some(void *data, size_t size, HandlerT handler) 00208 { 00209 mutex::scoped_lock lock(m_mutex); 00210 m_socket->async_read_some(asio::buffer(data, size), handler); 00211 } 00212 00213 private: 00214 socket_ptr m_socket; 00215 mutex m_mutex; 00216 }; 00217 00218 class session_io_console 00219 { 00220 public: 00221 typedef moost::io::helper::native_io_t native_io_t; 00222 00223 session_io_console(shared_ptr<asio::io_service> ios, native_io_t in_fd, native_io_t out_fd, native_io_t err_fd, const std::string& name) 00224 : m_ios(ios) 00225 , m_in_fwd(ios, in_fd) 00226 , m_out(out_fd) 00227 , m_err(err_fd) 00228 , m_name(name) 00229 { 00230 } 00231 00232 void set_nodelay() 00233 { 00234 // nothing to do here 00235 } 00236 00237 const std::string& get_peer_string() const 00238 { 00239 return m_name; 00240 } 00241 00242 void close() 00243 { 00244 // this is required to make sure the async read finishes 00245 m_in_fwd.close(); 00246 } 00247 00248 template <typename HandlerT> 00249 void write_stdout(const std::string& data, HandlerT handler) 00250 { 00251 write_console(m_out, data, handler); 00252 } 00253 00254 template <typename HandlerT> 00255 void write_stderr(const std::string& data, HandlerT handler) 00256 { 00257 write_console(m_err, data, handler); 00258 } 00259 00260 template <typename HandlerT> 00261 void read_some(void *data, size_t size, HandlerT handler) 00262 { 00263 m_in_fwd.read_async(data, size, handler); 00264 } 00265 00266 private: 00267 template <typename HandlerT> 00268 void write_console(native_io_t fd, const std::string& data, HandlerT handler) 00269 { 00270 // We can't use boost::asio for this, as we could be writing 00271 // to a regular file, for which the asio reactor (e.g. epoll 00272 // or poll) might not have support. 00273 00274 // So we just use the native, synchronous file i/o, but fake 00275 // it to behave as similar to boost::asio as possible. 00276 00277 boost::system::error_code ec; 00278 size_t written = 0; 00279 00280 { 00281 mutex::scoped_lock lock(m_mutex); 00282 00283 if (!moost::io::helper::write(fd, data.c_str(), data.length(), &written)) 00284 { 00285 ec.assign(moost::io::helper::error(), boost::asio::error::get_system_category()); 00286 written = 0; 00287 } 00288 } 00289 00290 // Make sure the handler is called synchronously, modelling 00291 // the behaviour of boost::asio. 00292 00293 m_ios->post(bind(handler, ec, static_cast<std::size_t>(written))); 00294 } 00295 00296 mutex m_mutex; 00297 shared_ptr<asio::io_service> m_ios; 00298 moost::io::async_stream_forwarder m_in_fwd; 00299 native_io_t m_out; 00300 native_io_t m_err; 00301 const std::string m_name; 00302 }; 00303 00304 template <class SessionIoT> 00305 class session_writer : public stream_writer_iface 00306 { 00307 public: 00308 session_writer(shared_ptr<SessionIoT> io) 00309 : m_io(io) 00310 { 00311 } 00312 00313 virtual void write(const char *data, size_t len) 00314 { 00315 std::string *s = new std::string(data, len); 00316 m_io->write_stdout(*s, bind(&session_writer<SessionIoT>::on_write_done, s)); 00317 } 00318 00319 private: 00320 static void on_write_done(std::string *str) 00321 { 00322 delete str; 00323 } 00324 00325 shared_ptr<SessionIoT> m_io; 00326 }; 00327 00328 class session_base : public enable_shared_from_this<session_base>, public noncopyable 00329 { 00330 protected: 00331 enum state { 00332 SESSION_CREATED, 00333 SESSION_ATTACHED, 00334 SESSION_STOPPING, 00335 SESSION_STOPPED 00336 }; 00337 00338 public: 00339 session_base(remote_shell_server_impl& srv, remote_shell_iface *rsi, bool allow_quit, bool enable_cout_cerr, bool enable_cls) 00340 : m_srv(srv) 00341 , m_state(SESSION_CREATED) 00342 , m_rsi(rsi) 00343 , m_cout_on(true) 00344 , m_cerr_on(true) 00345 , m_processing_input(true) 00346 , m_allow_quit(allow_quit) 00347 , m_enable_cout_cerr(enable_cout_cerr) 00348 , m_enable_cls(enable_cls) 00349 , m_t_connect(boost::posix_time::second_clock::universal_time()) 00350 { 00351 } 00352 00353 virtual ~session_base() 00354 { 00355 } 00356 00357 void start(appender_ptr appender) 00358 { 00359 set_nodelay(); 00360 m_app = appender; 00361 m_app->attach(); 00362 m_state = SESSION_ATTACHED; 00363 m_peer = get_peer_string(); 00364 continue_session("accepted client connection from " + m_peer + "\r\n" + m_srv.welcome()); 00365 read_more(); 00366 } 00367 00368 void stop(const std::string& msg) 00369 { 00370 if (is_attached()) 00371 { 00372 write("\r\n" + msg, SESSION_STOPPING); 00373 } 00374 } 00375 00376 void set_stdout_state(bool on) 00377 { 00378 m_cout_on = on; 00379 } 00380 00381 void set_stderr_state(bool on) 00382 { 00383 m_cerr_on = on; 00384 } 00385 00386 std::string get_info() const 00387 { 00388 boost::posix_time::time_duration duration = boost::posix_time::second_clock::universal_time() - m_t_connect; 00389 std::ostringstream oss; 00390 00391 oss << m_peer << " [" << duration << "]"; 00392 00393 return oss.str(); 00394 } 00395 00396 void add_stdout(const char *buffer, size_t count); 00397 void add_stderr(const char *buffer, size_t count); 00398 void command_result(bool handled, const std::string& cmd, const std::string& rv); 00399 00400 protected: 00401 virtual void set_nodelay() = 0; 00402 virtual std::string get_peer_string() const = 0; 00403 virtual void close() = 0; 00404 virtual void read_more(char *data, size_t max) = 0; 00405 virtual void write_stdout(std::string *s, state st) = 0; 00406 virtual void write_stderr(std::string *s, state st) = 0; 00407 00408 void on_write_done(std::string *s, state st, const system::error_code& error); 00409 void on_read_done(const system::error_code& error, size_t bytes_transferred); 00410 00411 private: 00412 void read_more() 00413 { 00414 read_more(m_data, max_length); 00415 } 00416 00417 void write(const std::string& str) 00418 { 00419 write(str, m_state); 00420 } 00421 00422 void write(const std::string& str, state st) 00423 { 00424 std::string *s = new std::string(str); 00425 write_stdout(s, st); 00426 } 00427 00428 bool is_attached() const 00429 { 00430 return m_state >= SESSION_ATTACHED && m_state < SESSION_STOPPED; 00431 } 00432 00433 bool is_stopped() const 00434 { 00435 return m_state >= SESSION_STOPPED; 00436 } 00437 00438 template <typename T> 00439 void get_more_help(std::ostream& os, const T& obj) const; 00440 00441 void get_help(std::string& rv) const; 00442 void continue_session(const std::string& str = ""); 00443 void handle_stop(); 00444 bool handle_command(std::string& rv, const std::string& cmd, const std::string& args); 00445 bool read_next_command(std::string& cmd, std::string& args); 00446 bool log_level(std::string& rv, const std::string& args); 00447 void process_input(); 00448 00449 remote_shell_server_impl& m_srv; 00450 std::string m_peer; 00451 enum { max_length = 1024 }; 00452 char m_data[max_length]; 00453 appender_ptr m_app; 00454 enum state m_state; 00455 remote_shell_iface * const m_rsi; 00456 std::string m_inbuf; 00457 bool m_cout_on; 00458 bool m_cerr_on; 00459 bool m_processing_input; 00460 const bool m_allow_quit; 00461 const bool m_enable_cout_cerr; 00462 const bool m_enable_cls; 00463 const boost::posix_time::ptime m_t_connect; 00464 }; 00465 00466 template <class SessionIoT, bool AllowQuit = true, bool EnableCLS = true> 00467 class session : public session_base 00468 { 00469 public: 00470 session(remote_shell_server_impl& srv, remote_shell_iface *rsi, shared_ptr<SessionIoT> io, bool enable_cout_cerr = true) 00471 : session_base(srv, rsi, AllowQuit, enable_cout_cerr, EnableCLS) 00472 , m_io(io) 00473 { 00474 } 00475 00476 protected: 00477 virtual void set_nodelay() 00478 { 00479 m_io->set_nodelay(); 00480 } 00481 00482 virtual std::string get_peer_string() const 00483 { 00484 return m_io->get_peer_string(); 00485 } 00486 00487 virtual void close() 00488 { 00489 return m_io->close(); 00490 } 00491 00492 virtual void read_more(char *data, size_t max) 00493 { 00494 m_io->read_some(data, max, bind(&session::on_read_done, shared_from_this(), 00495 asio::placeholders::error, asio::placeholders::bytes_transferred)); 00496 } 00497 00498 virtual void write_stdout(std::string *s, state st) 00499 { 00500 m_io->write_stdout(*s, bind(&session::on_write_done, shared_from_this(), s, st, asio::placeholders::error)); 00501 } 00502 00503 virtual void write_stderr(std::string *s, state st) 00504 { 00505 m_io->write_stderr(*s, bind(&session::on_write_done, shared_from_this(), s, st, asio::placeholders::error)); 00506 } 00507 00508 private: 00509 shared_ptr<SessionIoT> m_io; 00510 }; 00511 00512 /**********************************************************************/ 00513 00514 void session_base::add_stdout(const char *buffer, size_t count) 00515 { 00516 if (m_cout_on) 00517 { 00518 std::string *s = new std::string(); 00519 s->append(terminal_format::color(C_GREEN)); 00520 s->append(buffer, count); 00521 s->append(terminal_format::reset()); 00522 write_stdout(s, m_state); 00523 } 00524 } 00525 00526 void session_base::add_stderr(const char *buffer, size_t count) 00527 { 00528 if (m_cerr_on) 00529 { 00530 std::string *s = new std::string(); 00531 s->append(terminal_format::color(C_RED)); 00532 s->append(terminal_format::bold()); 00533 s->append(buffer, count); 00534 s->append(terminal_format::reset()); 00535 write_stderr(s, m_state); 00536 } 00537 } 00538 00539 template <typename T> 00540 void session_base::get_more_help(std::ostream& os, const T& obj) const 00541 { 00542 std::string help; 00543 00544 try 00545 { 00546 help = obj.show_help(); 00547 } 00548 catch (const std::exception& e) 00549 { 00550 help = std::string("exception while getting help: ") + e.what() + "\r\n"; 00551 } 00552 catch (...) 00553 { 00554 help = "unknown exception while getting help\r\n"; 00555 } 00556 00557 if (!help.empty()) 00558 { 00559 os << "-------------------------------------------------------\r\n" 00560 << help; 00561 } 00562 } 00563 00564 void session_base::get_help(std::string& rv) const 00565 { 00566 std::ostringstream oss; 00567 00568 oss << "=======================================================\r\n" 00569 "- help show this help\r\n"; 00570 00571 if (m_enable_cout_cerr) 00572 { 00573 oss << "- cout [on|off] get [set] stdout " << (m_cout_on ? "<ON>/off" : "on/<OFF>") << "\r\n" 00574 "- cerr [on|off] get [set] stderr " << (m_cerr_on ? "<ON>/off" : "on/<OFF>") << "\r\n"; 00575 } 00576 00577 oss << "- sessions show shell sessions\r\n"; 00578 00579 if (m_enable_cls) 00580 { 00581 oss << "- clear|cls clear screen\r\n"; 00582 } 00583 00584 if (m_allow_quit) 00585 { 00586 oss << "- quit|exit|bye quit this connection\r\n"; 00587 } 00588 00589 oss << "- shutdown shut down application\r\n"; 00590 00591 get_more_help(oss, *m_app); 00592 get_more_help(oss, *m_rsi); 00593 00594 oss << "=======================================================\r\n"; 00595 00596 rv = oss.str(); 00597 } 00598 00599 void session_base::on_write_done(std::string *s, state st, const system::error_code& error) 00600 { 00601 if (st > m_state) 00602 { 00603 m_state = st; 00604 } 00605 00606 delete s; 00607 00608 if (!error) 00609 { 00610 switch (m_state) 00611 { 00612 case SESSION_ATTACHED: 00613 break; 00614 00615 case SESSION_STOPPING: 00616 handle_stop(); 00617 break; 00618 00619 default: 00620 handle_stop(); 00621 break; 00622 } 00623 } 00624 else if (!is_stopped()) 00625 { 00626 handle_stop(); 00627 } 00628 } 00629 00630 void session_base::on_read_done(const system::error_code& error, size_t bytes_transferred) 00631 { 00632 if (error) 00633 { 00634 handle_stop(); 00635 return; 00636 } 00637 00638 m_inbuf.append(m_data, bytes_transferred); 00639 00640 process_input(); 00641 } 00642 00643 void session_base::process_input() 00644 { 00645 while (m_processing_input) 00646 { 00647 std::string cmd, args; 00648 00649 if (!read_next_command(cmd, args)) 00650 { 00651 read_more(); 00652 return; 00653 } 00654 00655 if (cmd.empty()) 00656 { 00657 continue_session(); 00658 } 00659 else 00660 { 00661 std::string rv; 00662 00663 if (handle_command(rv, cmd, args)) 00664 { 00665 continue_session(rv); 00666 } 00667 else 00668 { 00669 /* 00670 * This is either a service command, or an unknown command. 00671 * (Only the service can tell for sure, unfortunately.) 00672 * In any case, it potentially accesses a global resource which 00673 * may not be accessed concurrently from multiple shell sessions. 00674 * Thus, we route the request through the server who will queue 00675 * all service commands and process them sequentially in a 00676 * separate thread in order to avoid blocking of the clients. 00677 */ 00678 00679 m_processing_input = false; 00680 00681 m_srv.process_command(shared_from_this(), cmd, args); 00682 } 00683 } 00684 } 00685 } 00686 00687 void session_base::command_result(bool handled, const std::string& cmd, const std::string& rv) 00688 { 00689 if (handled) 00690 { 00691 continue_session(rv); 00692 } 00693 else 00694 { 00695 continue_session("unknown command: " + cmd); 00696 } 00697 00698 m_processing_input = true; 00699 00700 process_input(); 00701 } 00702 00703 void session_base::continue_session(const std::string& str) 00704 { 00705 if (!is_stopped()) 00706 { 00707 std::ostringstream oss; 00708 00709 oss << str; 00710 00711 if (!str.empty() && str[str.length() - 1] != '\n') 00712 { 00713 oss << "\r\n"; 00714 } 00715 00716 try 00717 { 00718 oss << m_rsi->get_prompt(); 00719 } 00720 catch (const std::exception& e) 00721 { 00722 oss << "(exception while getting prompt: " << e.what() << ")> "; 00723 } 00724 catch (...) 00725 { 00726 oss << "(unknown exception while getting prompt)> "; 00727 } 00728 00729 write(oss.str()); 00730 } 00731 } 00732 00733 void session_base::handle_stop() 00734 { 00735 if (!is_stopped()) 00736 { 00737 close(); 00738 00739 if (is_attached()) 00740 { 00741 m_app->detach(); 00742 } 00743 00744 m_cerr_on = m_cout_on = false; 00745 00746 m_state = SESSION_STOPPED; 00747 00748 m_srv.remove_session(shared_from_this()); 00749 } 00750 } 00751 00752 bool session_base::handle_command(std::string& rv, const std::string& cmd, const std::string& args) 00753 { 00754 if (m_allow_quit && (cmd == "exit" || cmd == "quit" || cmd == "bye")) 00755 { 00756 handle_stop(); 00757 } 00758 else if (cmd == "shutdown") 00759 { 00760 m_srv.stop("*** shutdown initiated by " + m_peer + " ***\r\n"); 00761 } 00762 else if (m_enable_cout_cerr && (cmd == "cerr" || cmd == "cout")) 00763 { 00764 bool *var = cmd == "cerr" ? &m_cerr_on : &m_cout_on; 00765 00766 if (args == "on") 00767 { 00768 *var = true; 00769 } 00770 else if (args == "off") 00771 { 00772 *var = false; 00773 } 00774 else if (args != "") 00775 { 00776 rv = "invalid argument for " + cmd; 00777 return true; 00778 } 00779 00780 rv = cmd + " set to [" + (*var ? "on" : "off") + "]"; 00781 } 00782 else if (cmd == "help") 00783 { 00784 get_help(rv); 00785 } 00786 else if (m_enable_cls && (cmd == "clear" || cmd == "cls")) 00787 { 00788 rv = "\033[2J\033[H"; 00789 } 00790 else if (cmd == "sessions") 00791 { 00792 m_srv.get_sessions_list(rv); 00793 } 00794 else 00795 { 00796 try 00797 { 00798 return m_app->handle_command(rv, cmd, args); 00799 } 00800 catch (const std::exception& e) 00801 { 00802 rv = "exception while running appender command " + cmd + ": " + e.what(); 00803 } 00804 catch (...) 00805 { 00806 rv = "unknown exception while running appender command " + cmd; 00807 } 00808 } 00809 00810 return true; 00811 } 00812 00813 bool session_base::read_next_command(std::string& cmd, std::string& args) 00814 { 00815 // do we have a full line? 00816 00817 std::string::size_type pos = m_inbuf.find('\n'); 00818 00819 if (pos == m_inbuf.npos) 00820 { 00821 return false; 00822 } 00823 00824 // strip comments 00825 00826 std::string::size_type cpos = m_inbuf.find('#'); 00827 00828 if (cpos != m_inbuf.npos && cpos < pos) 00829 { 00830 m_inbuf.replace(cpos, pos - cpos, "", 0); 00831 pos = cpos; 00832 } 00833 00834 // parse command and arguments 00835 00836 std::istringstream iss(m_inbuf); 00837 00838 m_inbuf.replace(0, pos + 1, "", 0); 00839 00840 iss >> cmd; 00841 00842 getline(iss, args); 00843 trim(args); 00844 00845 return true; 00846 } 00847 00848 /**********************************************************************/ 00849 00850 remote_shell_server_impl::remote_shell_server_impl(shared_ptr<asio::io_service> ios) 00851 : m_ios(ios) 00852 , m_app_factory(new null_appender_factory) 00853 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00854 , m_stdout(*ios) 00855 , m_stderr(*ios) 00856 #endif 00857 , m_default_stdout_state(true) 00858 , m_default_stderr_state(true) 00859 , m_enable_local(false) 00860 , m_can_snoop_stdio(false) 00861 , m_listen_port(0) 00862 , m_pre_shutdown_func(noop_pre_shutdown_func()) 00863 { 00864 } 00865 00866 remote_shell_server_impl::~remote_shell_server_impl() 00867 { 00868 } 00869 00870 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00871 bool remote_shell_server_impl::snoop_stdio(posix_stream_stealer& stealer, FILE *stream, stdstream *bsd, session_meth cb) 00872 { 00873 if (!stealer.steal(stream)) 00874 { 00875 return false; 00876 } 00877 00878 bsd->assign(stealer.get_pipe_fd()); 00879 00880 stdio_read_more(bsd, cb, new char[BUFSIZ]); 00881 00882 return true; 00883 } 00884 00885 void remote_shell_server_impl::on_stdio_read(stdstream *bsd, session_meth cb, const system::error_code& error, char *buffer, size_t count) 00886 { 00887 if (error) 00888 { 00889 delete[] buffer; 00890 return; 00891 } 00892 00893 foreach(session_ptr s, m_sessions) 00894 { 00895 (s.get()->*cb)(buffer, count); 00896 } 00897 00898 stdio_read_more(bsd, cb, buffer); 00899 } 00900 00901 void remote_shell_server_impl::stdio_read_more(stdstream *bsd, session_meth cb, char *buffer) 00902 { 00903 bsd->async_read_some(asio::buffer(buffer, BUFSIZ), 00904 bind(&remote_shell_server_impl::on_stdio_read, this, bsd, cb, 00905 asio::placeholders::error, buffer, asio::placeholders::bytes_transferred)); 00906 } 00907 #endif 00908 00909 bool remote_shell_server_impl::setup_stdio_snoopers() 00910 { 00911 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00912 if (!snoop_stdio(m_stdout_stealer, stdout, &m_stdout, &session_base::add_stdout) || 00913 !snoop_stdio(m_stderr_stealer, stderr, &m_stderr, &session_base::add_stderr)) 00914 { 00915 return false; 00916 } 00917 00918 // make sure our stolen streams behave like "standard" stdout/stderr 00919 setvbuf(stdout, NULL, _IOLBF, 0); 00920 setvbuf(stderr, NULL, _IONBF, 0); 00921 00922 return true; 00923 #else 00924 return false; 00925 #endif 00926 } 00927 00928 void remote_shell_server_impl::teardown_stdio_snoopers() 00929 { 00930 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR 00931 m_stdout_stealer.restore(); 00932 m_stdout.close(); 00933 m_stderr_stealer.restore(); 00934 m_stderr.close(); 00935 #endif 00936 } 00937 00938 bool remote_shell_server_impl::create_console_session(remote_shell_iface *rsi, session_ptr& new_session, stream_writer_ptr& writer) 00939 { 00940 #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) && defined(_POSIX_SOURCE) 00941 00942 int orig_in_fd = ::fileno(stdin); 00943 00944 if (orig_in_fd == -1) 00945 { 00946 return false; 00947 } 00948 00949 int input_fd = ::dup(orig_in_fd); 00950 00951 if (input_fd == -1) 00952 { 00953 return false; 00954 } 00955 00956 // set up console session 00957 shared_ptr<session_io_console> io(new session_io_console(m_ios, input_fd, 00958 m_stdout_stealer.get_backup_fd(), m_stderr_stealer.get_backup_fd(), "local console")); 00959 00960 new_session.reset(new session<session_io_console, false, true>(*this, rsi, io, m_can_snoop_stdio)); 00961 00962 writer.reset(new session_writer<session_io_console>(io)); 00963 00964 return true; 00965 00966 #elif defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE) && defined(_WIN32) 00967 00968 HANDLE in_hd = ::GetStdHandle(STD_INPUT_HANDLE); 00969 HANDLE out_hd = ::GetStdHandle(STD_OUTPUT_HANDLE); 00970 HANDLE err_hd = ::GetStdHandle(STD_ERROR_HANDLE); 00971 00972 if (in_hd == INVALID_HANDLE_VALUE || 00973 out_hd == INVALID_HANDLE_VALUE || 00974 err_hd == INVALID_HANDLE_VALUE) 00975 { 00976 return false; 00977 } 00978 00979 // set up console session 00980 shared_ptr<session_io_console> io(new session_io_console(m_ios, in_hd, out_hd, err_hd, "local console")); 00981 00982 new_session.reset(new session<session_io_console, false, false>(*this, rsi, io, m_can_snoop_stdio)); 00983 00984 writer.reset(new session_writer<session_io_console>(io)); 00985 00986 return true; 00987 00988 #else 00989 00990 return false; 00991 00992 #endif 00993 } 00994 00995 bool remote_shell_server_impl::setup_console_session(remote_shell_iface *rsi) 00996 { 00997 session_ptr session; 00998 stream_writer_ptr writer; 00999 01000 if (!create_console_session(rsi, session, writer)) 01001 { 01002 return false; 01003 } 01004 01005 session->set_stdout_state(m_default_stdout_state); 01006 session->set_stderr_state(m_default_stderr_state); 01007 01008 m_sessions.insert(session); 01009 01010 session->start(m_app_factory->create(writer)); 01011 01012 return true; 01013 } 01014 01015 void remote_shell_server_impl::stop(const std::string& msg) 01016 { 01017 m_ios->post(bind(&remote_shell_server_impl::handle_stop, this, msg)); 01018 } 01019 01020 void remote_shell_server_impl::handle_accept(socket_ptr socket, remote_shell_iface *rsi, const system::error_code& error) 01021 { 01022 if (!error) 01023 { 01024 shared_ptr<session_io_socket> io(new session_io_socket(socket)); 01025 session_ptr new_session(new session<session_io_socket>(*this, rsi, io, m_can_snoop_stdio)); 01026 01027 stream_writer_ptr writer(new session_writer<session_io_socket>(io)); 01028 01029 new_session->set_stdout_state(m_default_stdout_state); 01030 new_session->set_stderr_state(m_default_stderr_state); 01031 01032 m_sessions.insert(new_session); 01033 01034 new_session->start(m_app_factory->create(writer)); 01035 01036 accept_session(rsi); 01037 } 01038 } 01039 01040 void remote_shell_server_impl::accept_session(remote_shell_iface *rsi) 01041 { 01042 socket_ptr socket(new tcp::socket(*m_ios)); 01043 01044 m_acceptor->async_accept(*socket, 01045 bind(&remote_shell_server_impl::handle_accept, 01046 this, socket, rsi, asio::placeholders::error)); 01047 } 01048 01049 void remote_shell_server_impl::remove_session(session_ptr p) 01050 { 01051 m_sessions.erase(p); 01052 } 01053 01054 void remote_shell_server_impl::handle_stop(const std::string& msg) 01055 { 01056 if (m_acceptor) 01057 { 01058 m_acceptor->close(); 01059 } 01060 01061 foreach(session_ptr s, m_sessions) 01062 { 01063 m_ios->post(bind(&session_base::stop, s, msg)); 01064 } 01065 01066 m_ios->post(bind(&remote_shell_server_impl::pre_shutdown, this)); 01067 } 01068 01069 void remote_shell_server_impl::pre_shutdown() 01070 { 01071 teardown_stdio_snoopers(); 01072 m_pre_shutdown_func(); 01073 } 01074 01075 void remote_shell_server_impl::get_sessions_list(std::string& rv) 01076 { 01077 std::ostringstream oss; 01078 01079 foreach(session_ptr s, m_sessions) 01080 { 01081 oss << s->get_info() << "\r\n"; 01082 } 01083 01084 rv = oss.str(); 01085 } 01086 01087 void remote_shell_server_impl::process_command(session_ptr session, const std::string& cmd, const std::string& args) 01088 { 01089 { 01090 mutex::scoped_lock lock(m_cmd_queue_mutex); 01091 m_cmd_queue.push(command(session, cmd, args)); 01092 } 01093 01094 m_cmd_queue_cond.notify_one(); 01095 } 01096 01097 void remote_shell_server_impl::command_thread(remote_shell_iface *rsi) 01098 { 01099 /* 01100 * Commands may need some time to execute, thus they are moved to a separate 01101 * thread (this one) in order to avoid: 01102 * 01103 * - blocking of any stdout/stderr output 01104 * - blocking of any other client sessions 01105 */ 01106 01107 while (true) 01108 { 01109 command cmd; 01110 01111 { 01112 mutex::scoped_lock lock(m_cmd_queue_mutex); 01113 01114 if (m_cmd_queue.empty()) 01115 { 01116 m_cmd_queue_cond.wait(lock); 01117 } 01118 01119 if (m_cmd_queue.empty()) 01120 { 01121 continue; 01122 } 01123 01124 cmd = m_cmd_queue.front(); 01125 m_cmd_queue.pop(); 01126 } 01127 01128 if (!cmd.session) 01129 { 01130 break; 01131 } 01132 01133 std::string rv; 01134 bool handled = true; 01135 01136 try 01137 { 01138 handled = rsi->handle_command(rv, cmd.cmd, cmd.args); 01139 } 01140 catch (const std::exception& e) 01141 { 01142 std::ostringstream oss; 01143 oss << "exception while running " << cmd.cmd << " command: " << e.what(); 01144 rv = oss.str(); 01145 } 01146 catch (...) 01147 { 01148 std::ostringstream oss; 01149 oss << "unknown exception while running " << cmd.cmd << " command"; 01150 rv = oss.str(); 01151 } 01152 01153 if (m_sessions.find(cmd.session) != m_sessions.end()) 01154 { 01155 m_ios->post(bind(&session_base::command_result, cmd.session, handled, cmd.cmd, rv)); 01156 } 01157 } 01158 } 01159 01160 void remote_shell_server_impl::set_appender_factory(appender_factory_ptr app_factory) 01161 { 01162 m_app_factory = app_factory; 01163 } 01164 01165 void remote_shell_server_impl::set_default_stdout_state(bool enabled) 01166 { 01167 m_default_stdout_state = enabled; 01168 } 01169 01170 void remote_shell_server_impl::set_default_stderr_state(bool enabled) 01171 { 01172 m_default_stderr_state = enabled; 01173 } 01174 01175 void remote_shell_server_impl::set_listen_port(unsigned short port) 01176 { 01177 m_listen_port = port; 01178 } 01179 01180 void remote_shell_server_impl::set_pre_shutdown_function(boost::function0<void>& func) 01181 { 01182 m_pre_shutdown_func = func; 01183 } 01184 01185 void remote_shell_server_impl::enable_local_shell(bool enabled) 01186 { 01187 m_enable_local = enabled; 01188 } 01189 01190 void remote_shell_server_impl::run(remote_shell_iface *rsi) 01191 { 01192 if (m_listen_port == 0 && !m_enable_local) 01193 { 01194 throw std::runtime_error("invalid configuration (no local and no remote port)"); 01195 } 01196 01197 std::ostringstream oss; 01198 01199 m_can_snoop_stdio = setup_stdio_snoopers(); 01200 01201 if (!m_can_snoop_stdio) 01202 { 01203 oss << "stdio snooping not available\r\n"; 01204 } 01205 01206 m_welcome = oss.str(); 01207 01208 if (m_enable_local) 01209 { 01210 if (!setup_console_session(rsi)) 01211 { 01212 throw std::runtime_error("failed to set up local shell session"); 01213 } 01214 } 01215 01216 if (m_listen_port != 0) 01217 { 01218 m_acceptor.reset(new tcp::acceptor(*m_ios, tcp::endpoint(tcp::v4(), m_listen_port))); 01219 accept_session(rsi); 01220 } 01221 01222 m_cmd_runner.reset(new thread(boost::bind(&remote_shell_server_impl::command_thread, this, rsi))); 01223 01224 m_ios->run(); 01225 01226 { 01227 mutex::scoped_lock lock(m_cmd_queue_mutex); 01228 m_cmd_queue.push(command()); // shutdown thread 01229 } 01230 01231 m_cmd_queue_cond.notify_one(); 01232 m_cmd_runner->join(); 01233 } 01234 01235 /**********************************************************************/ 01236 01237 remote_shell_server::remote_shell_server(shared_ptr<asio::io_service> ios) 01238 : m_impl(new remote_shell_server_impl(ios)) 01239 { 01240 } 01241 01242 remote_shell_server::~remote_shell_server() 01243 { 01244 delete m_impl; 01245 } 01246 01247 void remote_shell_server::run(remote_shell_iface *rsi) 01248 { 01249 m_impl->run(rsi); 01250 } 01251 01252 void remote_shell_server::stop(const std::string& msg) 01253 { 01254 m_impl->stop(msg); 01255 } 01256 01257 void remote_shell_server::set_appender_factory(appender_factory_ptr app_factory) 01258 { 01259 m_impl->set_appender_factory(app_factory); 01260 } 01261 01262 void remote_shell_server::set_default_stdout_state(bool enabled) 01263 { 01264 m_impl->set_default_stdout_state(enabled); 01265 } 01266 01267 void remote_shell_server::set_default_stderr_state(bool enabled) 01268 { 01269 m_impl->set_default_stderr_state(enabled); 01270 } 01271 01272 void remote_shell_server::set_listen_port(unsigned short port) 01273 { 01274 m_impl->set_listen_port(port); 01275 } 01276 01277 void remote_shell_server::set_pre_shutdown_function(boost::function0<void>& func) 01278 { 01279 m_impl->set_pre_shutdown_function(func); 01280 } 01281 01282 void remote_shell_server::enable_local_shell(bool enabled) 01283 { 01284 m_impl->enable_local_shell(enabled); 01285 } 01286