libmoost
/home/mhx/git/github/libmoost/src/service/remote_shell.cpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #include <cstdio>
00029 #include <set>
00030 #include <queue>
00031 #include <string>
00032 #include <stdexcept>
00033 #include <boost/asio.hpp>
00034 #include <boost/bind.hpp>
00035 #include <boost/enable_shared_from_this.hpp>
00036 #include <boost/noncopyable.hpp>
00037 #include <boost/foreach.hpp>
00038 #include <boost/algorithm/string/trim.hpp>
00039 #include <boost/thread.hpp>
00040 #include <boost/date_time/posix_time/posix_time.hpp>
00041 
00042 #include "../../include/moost/terminal_format.hpp"
00043 #include "../../include/moost/utils/foreach.hpp"
00044 #include "../../include/moost/io/helper.hpp"
00045 #include "../../include/moost/io/async_stream_forwarder.hpp"
00046 #include "../../include/moost/service/remote_shell.h"
00047 #include "../../include/moost/service/posix_stream_stealer.h"
00048 
00049 #if defined(_POSIX_SOURCE) || defined(__CYGWIN__)
00050 // needed for ::dup(), ::write(), ::fileno()
00051 # include <unistd.h>
00052 #elif defined(_WIN32)
00053 # include <windows.h>
00054 #else
00055 # error "apparently no local shell support has been added for this platform"
00056 #endif
00057 
00058 /*
00059  *  TODO: cleanup stdstream/dup/fileno/read/write/pipe stuff
00060  */
00061 
00062 using boost::asio::ip::tcp;
00063 using namespace boost;
00064 using namespace moost;
00065 using namespace moost::service;
00066 
00067 class session_base;
00068 typedef shared_ptr<session_base> session_ptr;
00069 typedef shared_ptr<tcp::socket> socket_ptr;
00070 typedef void (session_base::*session_meth)(const char *buffer, size_t count);
00071 
00072 namespace moost { namespace service {
00073 
00074 class remote_shell_server_impl : public noncopyable
00075 {
00076 private:
00077    struct noop_pre_shutdown_func
00078    {
00079       void operator()()
00080       {
00081       }
00082    };
00083 
00084    struct command
00085    {
00086       session_ptr session;
00087       std::string cmd;
00088       std::string args;
00089 
00090       command()
00091       {}
00092 
00093       command(const session_ptr& s, const std::string& c, const std::string& a)
00094          : session(s), cmd(c), args(a)
00095       {}
00096    };
00097 
00098 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00099    typedef asio::posix::basic_stream_descriptor<> stdstream;
00100 #endif
00101 
00102    void handle_accept(socket_ptr socket, remote_shell_iface *rsi, const system::error_code& error);
00103    void handle_stop(const std::string& msg);
00104    void accept_session(remote_shell_iface *rsi);
00105    void pre_shutdown();
00106 
00107 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00108    bool snoop_stdio(posix_stream_stealer& stealer, FILE *stream, stdstream *bsd, session_meth cb);
00109    void on_stdio_read(stdstream *bsd, session_meth cb, const system::error_code& error, char *buffer, size_t count);
00110    void stdio_read_more(stdstream *bsd, session_meth cb, char *buffer);
00111 #endif
00112 
00113    bool setup_stdio_snoopers();
00114    void teardown_stdio_snoopers();
00115 
00116    bool create_console_session(remote_shell_iface *rsi, session_ptr& new_session, stream_writer_ptr& writer);
00117    bool setup_console_session(remote_shell_iface *rsi);
00118 
00119    void command_thread(remote_shell_iface *rsi);
00120 
00121    shared_ptr<asio::io_service> m_ios;
00122    shared_ptr<tcp::acceptor> m_acceptor;
00123    std::set<session_ptr> m_sessions;
00124    appender_factory_ptr m_app_factory;
00125 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00126    stdstream m_stdout;
00127    stdstream m_stderr;
00128    posix_stream_stealer m_stdout_stealer;
00129    posix_stream_stealer m_stderr_stealer;
00130 #endif
00131    std::string m_welcome;
00132    bool m_default_stdout_state;
00133    bool m_default_stderr_state;
00134    bool m_enable_local;
00135    bool m_can_snoop_stdio;
00136    unsigned short m_listen_port;
00137    shared_ptr<thread> m_cmd_runner;
00138    std::queue< command > m_cmd_queue;
00139    mutex m_cmd_queue_mutex;
00140    condition_variable m_cmd_queue_cond;
00141    boost::function0<void> m_pre_shutdown_func;
00142 
00143 public:
00144    remote_shell_server_impl(shared_ptr<asio::io_service> ios);
00145    ~remote_shell_server_impl();
00146    void run(remote_shell_iface *rsi);
00147    void stop(const std::string& msg);
00148    void remove_session(session_ptr p);
00149    void set_appender_factory(appender_factory_ptr app_factory);
00150    void set_default_stdout_state(bool enabled);
00151    void set_default_stderr_state(bool enabled);
00152    void set_listen_port(unsigned short port);
00153    void set_pre_shutdown_function(boost::function0<void>& func);
00154    void enable_local_shell(bool enabled);
00155    void get_sessions_list(std::string& rv);
00156    void process_command(session_ptr session, const std::string& cmd, const std::string& args);
00157 
00158    const std::string& welcome() const
00159    {
00160       return m_welcome;
00161    }
00162 };
00163 
00164 } }
00165 
00166 class session_io_socket
00167 {
00168 public:
00169    session_io_socket(socket_ptr socket)
00170      : m_socket(socket)
00171    {
00172    }
00173 
00174    void set_nodelay()
00175    {
00176       m_socket->set_option(tcp::no_delay(true));
00177    }
00178 
00179    std::string get_peer_string() const
00180    {
00181       std::ostringstream oss;
00182       oss << m_socket->remote_endpoint();
00183       return oss.str();
00184    }
00185 
00186    void close()
00187    {
00188       mutex::scoped_lock lock(m_mutex);
00189       m_socket->close();
00190    }
00191 
00192    template <typename HandlerT>
00193    void write_stdout(const std::string& data, HandlerT handler)
00194    {
00195       mutex::scoped_lock lock(m_mutex);
00196       asio::async_write(*m_socket, asio::buffer(data.c_str(), data.length()), handler);
00197    }
00198 
00199    template <typename HandlerT>
00200    void write_stderr(const std::string& data, HandlerT handler)
00201    {
00202       mutex::scoped_lock lock(m_mutex);
00203       asio::async_write(*m_socket, asio::buffer(data.c_str(), data.length()), handler);
00204    }
00205 
00206    template <typename HandlerT>
00207    void read_some(void *data, size_t size, HandlerT handler)
00208    {
00209       mutex::scoped_lock lock(m_mutex);
00210       m_socket->async_read_some(asio::buffer(data, size), handler);
00211    }
00212 
00213 private:
00214    socket_ptr m_socket;
00215    mutex m_mutex;
00216 };
00217 
00218 class session_io_console
00219 {
00220 public:
00221    typedef moost::io::helper::native_io_t native_io_t;
00222 
00223    session_io_console(shared_ptr<asio::io_service> ios, native_io_t in_fd, native_io_t out_fd, native_io_t err_fd, const std::string& name)
00224      : m_ios(ios)
00225      , m_in_fwd(ios, in_fd)
00226      , m_out(out_fd)
00227      , m_err(err_fd)
00228      , m_name(name)
00229    {
00230    }
00231 
00232    void set_nodelay()
00233    {
00234       // nothing to do here
00235    }
00236 
00237    const std::string& get_peer_string() const
00238    {
00239       return m_name;
00240    }
00241 
00242    void close()
00243    {
00244       // this is required to make sure the async read finishes
00245       m_in_fwd.close();
00246    }
00247 
00248    template <typename HandlerT>
00249    void write_stdout(const std::string& data, HandlerT handler)
00250    {
00251       write_console(m_out, data, handler);
00252    }
00253 
00254    template <typename HandlerT>
00255    void write_stderr(const std::string& data, HandlerT handler)
00256    {
00257       write_console(m_err, data, handler);
00258    }
00259 
00260    template <typename HandlerT>
00261    void read_some(void *data, size_t size, HandlerT handler)
00262    {
00263       m_in_fwd.read_async(data, size, handler);
00264    }
00265 
00266 private:
00267    template <typename HandlerT>
00268    void write_console(native_io_t fd, const std::string& data, HandlerT handler)
00269    {
00270       // We can't use boost::asio for this, as we could be writing
00271       // to a regular file, for which the asio reactor (e.g. epoll
00272       // or poll) might not have support.
00273 
00274       // So we just use the native, synchronous file i/o, but fake
00275       // it to behave as similar to boost::asio as possible.
00276 
00277       boost::system::error_code ec;
00278       size_t written = 0;
00279 
00280       {
00281          mutex::scoped_lock lock(m_mutex);
00282 
00283          if (!moost::io::helper::write(fd, data.c_str(), data.length(), &written))
00284          {
00285             ec.assign(moost::io::helper::error(), boost::asio::error::get_system_category());
00286             written = 0;
00287          }
00288       }
00289 
00290       // Make sure the handler is called synchronously, modelling
00291       // the behaviour of boost::asio.
00292 
00293       m_ios->post(bind(handler, ec, static_cast<std::size_t>(written)));
00294    }
00295 
00296    mutex m_mutex;
00297    shared_ptr<asio::io_service> m_ios;
00298    moost::io::async_stream_forwarder m_in_fwd;
00299    native_io_t m_out;
00300    native_io_t m_err;
00301    const std::string m_name;
00302 };
00303 
00304 template <class SessionIoT>
00305 class session_writer : public stream_writer_iface
00306 {
00307 public:
00308    session_writer(shared_ptr<SessionIoT> io)
00309      : m_io(io)
00310    {
00311    }
00312 
00313    virtual void write(const char *data, size_t len)
00314    {
00315       std::string *s = new std::string(data, len);
00316       m_io->write_stdout(*s, bind(&session_writer<SessionIoT>::on_write_done, s));
00317    }
00318 
00319 private:
00320    static void on_write_done(std::string *str)
00321    {
00322       delete str;
00323    }
00324 
00325    shared_ptr<SessionIoT> m_io;
00326 };
00327 
00328 class session_base : public enable_shared_from_this<session_base>, public noncopyable
00329 {
00330 protected:
00331    enum state {
00332       SESSION_CREATED,
00333       SESSION_ATTACHED,
00334       SESSION_STOPPING,
00335       SESSION_STOPPED
00336    };
00337 
00338 public:
00339    session_base(remote_shell_server_impl& srv, remote_shell_iface *rsi, bool allow_quit, bool enable_cout_cerr, bool enable_cls)
00340       : m_srv(srv)
00341       , m_state(SESSION_CREATED)
00342       , m_rsi(rsi)
00343       , m_cout_on(true)
00344       , m_cerr_on(true)
00345       , m_processing_input(true)
00346       , m_allow_quit(allow_quit)
00347       , m_enable_cout_cerr(enable_cout_cerr)
00348       , m_enable_cls(enable_cls)
00349       , m_t_connect(boost::posix_time::second_clock::universal_time())
00350    {
00351    }
00352 
00353    virtual ~session_base()
00354    {
00355    }
00356 
00357    void start(appender_ptr appender)
00358    {
00359       set_nodelay();
00360       m_app = appender;
00361       m_app->attach();
00362       m_state = SESSION_ATTACHED;
00363       m_peer = get_peer_string();
00364       continue_session("accepted client connection from " + m_peer + "\r\n" + m_srv.welcome());
00365       read_more();
00366    }
00367 
00368    void stop(const std::string& msg)
00369    {
00370       if (is_attached())
00371       {
00372          write("\r\n" + msg, SESSION_STOPPING);
00373       }
00374    }
00375 
00376    void set_stdout_state(bool on)
00377    {
00378       m_cout_on = on;
00379    }
00380 
00381    void set_stderr_state(bool on)
00382    {
00383       m_cerr_on = on;
00384    }
00385 
00386    std::string get_info() const
00387    {
00388       boost::posix_time::time_duration duration = boost::posix_time::second_clock::universal_time() - m_t_connect;
00389       std::ostringstream oss;
00390 
00391       oss << m_peer << " [" << duration << "]";
00392 
00393       return oss.str();
00394    }
00395 
00396    void add_stdout(const char *buffer, size_t count);
00397    void add_stderr(const char *buffer, size_t count);
00398    void command_result(bool handled, const std::string& cmd, const std::string& rv);
00399 
00400 protected:
00401    virtual void set_nodelay() = 0;
00402    virtual std::string get_peer_string() const = 0;
00403    virtual void close() = 0;
00404    virtual void read_more(char *data, size_t max) = 0;
00405    virtual void write_stdout(std::string *s, state st) = 0;
00406    virtual void write_stderr(std::string *s, state st) = 0;
00407 
00408    void on_write_done(std::string *s, state st, const system::error_code& error);
00409    void on_read_done(const system::error_code& error, size_t bytes_transferred);
00410 
00411 private:
00412    void read_more()
00413    {
00414       read_more(m_data, max_length);
00415    }
00416 
00417    void write(const std::string& str)
00418    {
00419       write(str, m_state);
00420    }
00421 
00422    void write(const std::string& str, state st)
00423    {
00424       std::string *s = new std::string(str);
00425       write_stdout(s, st);
00426    }
00427 
00428    bool is_attached() const
00429    {
00430       return m_state >= SESSION_ATTACHED && m_state < SESSION_STOPPED;
00431    }
00432 
00433    bool is_stopped() const
00434    {
00435       return m_state >= SESSION_STOPPED;
00436    }
00437 
00438    template <typename T>
00439    void get_more_help(std::ostream& os, const T& obj) const;
00440 
00441    void get_help(std::string& rv) const;
00442    void continue_session(const std::string& str = "");
00443    void handle_stop();
00444    bool handle_command(std::string& rv, const std::string& cmd, const std::string& args);
00445    bool read_next_command(std::string& cmd, std::string& args);
00446    bool log_level(std::string& rv, const std::string& args);
00447    void process_input();
00448 
00449    remote_shell_server_impl& m_srv;
00450    std::string m_peer;
00451    enum { max_length = 1024 };
00452    char m_data[max_length];
00453    appender_ptr m_app;
00454    enum state m_state;
00455    remote_shell_iface * const m_rsi;
00456    std::string m_inbuf;
00457    bool m_cout_on;
00458    bool m_cerr_on;
00459    bool m_processing_input;
00460    const bool m_allow_quit;
00461    const bool m_enable_cout_cerr;
00462    const bool m_enable_cls;
00463    const boost::posix_time::ptime m_t_connect;
00464 };
00465 
00466 template <class SessionIoT, bool AllowQuit = true, bool EnableCLS = true>
00467 class session : public session_base
00468 {
00469 public:
00470    session(remote_shell_server_impl& srv, remote_shell_iface *rsi, shared_ptr<SessionIoT> io, bool enable_cout_cerr = true)
00471       : session_base(srv, rsi, AllowQuit, enable_cout_cerr, EnableCLS)
00472       , m_io(io)
00473    {
00474    }
00475 
00476 protected:
00477    virtual void set_nodelay()
00478    {
00479       m_io->set_nodelay();
00480    }
00481 
00482    virtual std::string get_peer_string() const
00483    {
00484       return m_io->get_peer_string();
00485    }
00486 
00487    virtual void close()
00488    {
00489       return m_io->close();
00490    }
00491 
00492    virtual void read_more(char *data, size_t max)
00493    {
00494       m_io->read_some(data, max, bind(&session::on_read_done, shared_from_this(),
00495             asio::placeholders::error, asio::placeholders::bytes_transferred));
00496    }
00497 
00498    virtual void write_stdout(std::string *s, state st)
00499    {
00500       m_io->write_stdout(*s, bind(&session::on_write_done, shared_from_this(), s, st, asio::placeholders::error));
00501    }
00502 
00503    virtual void write_stderr(std::string *s, state st)
00504    {
00505       m_io->write_stderr(*s, bind(&session::on_write_done, shared_from_this(), s, st, asio::placeholders::error));
00506    }
00507 
00508 private:
00509    shared_ptr<SessionIoT> m_io;
00510 };
00511 
00512 /**********************************************************************/
00513 
00514 void session_base::add_stdout(const char *buffer, size_t count)
00515 {
00516    if (m_cout_on)
00517    {
00518       std::string *s = new std::string();
00519       s->append(terminal_format::color(C_GREEN));
00520       s->append(buffer, count);
00521       s->append(terminal_format::reset());
00522       write_stdout(s, m_state);
00523    }
00524 }
00525 
00526 void session_base::add_stderr(const char *buffer, size_t count)
00527 {
00528    if (m_cerr_on)
00529    {
00530       std::string *s = new std::string();
00531       s->append(terminal_format::color(C_RED));
00532       s->append(terminal_format::bold());
00533       s->append(buffer, count);
00534       s->append(terminal_format::reset());
00535       write_stderr(s, m_state);
00536    }
00537 }
00538 
00539 template <typename T>
00540 void session_base::get_more_help(std::ostream& os, const T& obj) const
00541 {
00542    std::string help;
00543 
00544    try
00545    {
00546       help = obj.show_help();
00547    }
00548    catch (const std::exception& e)
00549    {
00550       help = std::string("exception while getting help: ") + e.what() + "\r\n";
00551    }
00552    catch (...)
00553    {
00554       help = "unknown exception while getting help\r\n";
00555    }
00556 
00557    if (!help.empty())
00558    {
00559       os << "-------------------------------------------------------\r\n"
00560          << help;
00561    }
00562 }
00563 
00564 void session_base::get_help(std::string& rv) const
00565 {
00566    std::ostringstream oss;
00567 
00568    oss <<    "=======================================================\r\n"
00569              "- help              show this help\r\n";
00570 
00571    if (m_enable_cout_cerr)
00572    {
00573       oss << "- cout [on|off]     get [set] stdout " << (m_cout_on ? "<ON>/off" : "on/<OFF>") << "\r\n"
00574              "- cerr [on|off]     get [set] stderr " << (m_cerr_on ? "<ON>/off" : "on/<OFF>") << "\r\n";
00575    }
00576 
00577    oss <<    "- sessions          show shell sessions\r\n";
00578 
00579    if (m_enable_cls)
00580    {
00581       oss << "- clear|cls         clear screen\r\n";
00582    }
00583 
00584    if (m_allow_quit)
00585    {
00586       oss << "- quit|exit|bye     quit this connection\r\n";
00587    }
00588 
00589    oss <<    "- shutdown          shut down application\r\n";
00590 
00591    get_more_help(oss, *m_app);
00592    get_more_help(oss, *m_rsi);
00593 
00594    oss <<    "=======================================================\r\n";
00595 
00596    rv = oss.str();
00597 }
00598 
00599 void session_base::on_write_done(std::string *s, state st, const system::error_code& error)
00600 {
00601    if (st > m_state)
00602    {
00603       m_state = st;
00604    }
00605 
00606    delete s;
00607 
00608    if (!error)
00609    {
00610       switch (m_state)
00611       {
00612          case SESSION_ATTACHED:
00613             break;
00614 
00615          case SESSION_STOPPING:
00616             handle_stop();
00617             break;
00618 
00619          default:
00620             handle_stop();
00621             break;
00622       }
00623    }
00624    else if (!is_stopped())
00625    {
00626       handle_stop();
00627    }
00628 }
00629 
00630 void session_base::on_read_done(const system::error_code& error, size_t bytes_transferred)
00631 {
00632    if (error)
00633    {
00634       handle_stop();
00635       return;
00636    }
00637 
00638    m_inbuf.append(m_data, bytes_transferred);
00639 
00640    process_input();
00641 }
00642 
00643 void session_base::process_input()
00644 {
00645    while (m_processing_input)
00646    {
00647       std::string cmd, args;
00648 
00649       if (!read_next_command(cmd, args))
00650       {
00651          read_more();
00652          return;
00653       }
00654 
00655       if (cmd.empty())
00656       {
00657          continue_session();
00658       }
00659       else
00660       {
00661          std::string rv;
00662 
00663          if (handle_command(rv, cmd, args))
00664          {
00665             continue_session(rv);
00666          }
00667          else
00668          {
00669             /*
00670              *  This is either a service command, or an unknown command.
00671              *  (Only the service can tell for sure, unfortunately.)
00672              *  In any case, it potentially accesses a global resource which
00673              *  may not be accessed concurrently from multiple shell sessions.
00674              *  Thus, we route the request through the server who will queue
00675              *  all service commands and process them sequentially in a
00676              *  separate thread in order to avoid blocking of the clients.
00677              */
00678 
00679             m_processing_input = false;
00680 
00681             m_srv.process_command(shared_from_this(), cmd, args);
00682          }
00683       }
00684    }
00685 }
00686 
00687 void session_base::command_result(bool handled, const std::string& cmd, const std::string& rv)
00688 {
00689    if (handled)
00690    {
00691       continue_session(rv);
00692    }
00693    else
00694    {
00695       continue_session("unknown command: " + cmd);
00696    }
00697 
00698    m_processing_input = true;
00699 
00700    process_input();
00701 }
00702 
00703 void session_base::continue_session(const std::string& str)
00704 {
00705    if (!is_stopped())
00706    {
00707       std::ostringstream oss;
00708 
00709       oss << str;
00710 
00711       if (!str.empty() && str[str.length() - 1] != '\n')
00712       {
00713          oss << "\r\n";
00714       }
00715 
00716       try
00717       {
00718          oss << m_rsi->get_prompt();
00719       }
00720       catch (const std::exception& e)
00721       {
00722          oss << "(exception while getting prompt: " << e.what() << ")> ";
00723       }
00724       catch (...)
00725       {
00726          oss << "(unknown exception while getting prompt)> ";
00727       }
00728 
00729       write(oss.str());
00730    }
00731 }
00732 
00733 void session_base::handle_stop()
00734 {
00735    if (!is_stopped())
00736    {
00737       close();
00738 
00739       if (is_attached())
00740       {
00741          m_app->detach();
00742       }
00743 
00744       m_cerr_on = m_cout_on = false;
00745 
00746       m_state = SESSION_STOPPED;
00747 
00748       m_srv.remove_session(shared_from_this());
00749    }
00750 }
00751 
00752 bool session_base::handle_command(std::string& rv, const std::string& cmd, const std::string& args)
00753 {
00754    if (m_allow_quit && (cmd == "exit" || cmd == "quit" || cmd == "bye"))
00755    {
00756       handle_stop();
00757    }
00758    else if (cmd == "shutdown")
00759    {
00760       m_srv.stop("*** shutdown initiated by " + m_peer + " ***\r\n");
00761    }
00762    else if (m_enable_cout_cerr && (cmd == "cerr" || cmd == "cout"))
00763    {
00764       bool *var = cmd == "cerr" ? &m_cerr_on : &m_cout_on;
00765 
00766       if (args == "on")
00767       {
00768          *var = true;
00769       }
00770       else if (args == "off")
00771       {
00772          *var = false;
00773       }
00774       else if (args != "")
00775       {
00776          rv = "invalid argument for " + cmd;
00777          return true;
00778       }
00779 
00780       rv = cmd + " set to [" + (*var ? "on" : "off") + "]";
00781    }
00782    else if (cmd == "help")
00783    {
00784       get_help(rv);
00785    }
00786    else if (m_enable_cls && (cmd == "clear" || cmd == "cls"))
00787    {
00788       rv = "\033[2J\033[H";
00789    }
00790    else if (cmd == "sessions")
00791    {
00792       m_srv.get_sessions_list(rv);
00793    }
00794    else
00795    {
00796       try
00797       {
00798          return m_app->handle_command(rv, cmd, args);
00799       }
00800       catch (const std::exception& e)
00801       {
00802          rv = "exception while running appender command " + cmd + ": " + e.what();
00803       }
00804       catch (...)
00805       {
00806          rv = "unknown exception while running appender command " + cmd;
00807       }
00808    }
00809 
00810    return true;
00811 }
00812 
00813 bool session_base::read_next_command(std::string& cmd, std::string& args)
00814 {
00815    // do we have a full line?
00816 
00817    std::string::size_type pos = m_inbuf.find('\n');
00818 
00819    if (pos == m_inbuf.npos)
00820    {
00821       return false;
00822    }
00823 
00824    // strip comments
00825 
00826    std::string::size_type cpos = m_inbuf.find('#');
00827 
00828    if (cpos != m_inbuf.npos && cpos < pos)
00829    {
00830       m_inbuf.replace(cpos, pos - cpos, "", 0);
00831       pos = cpos;
00832    }
00833 
00834    // parse command and arguments
00835 
00836    std::istringstream iss(m_inbuf);
00837 
00838    m_inbuf.replace(0, pos + 1, "", 0);
00839 
00840    iss >> cmd;
00841 
00842    getline(iss, args);
00843    trim(args);
00844 
00845    return true;
00846 }
00847 
00848 /**********************************************************************/
00849 
00850 remote_shell_server_impl::remote_shell_server_impl(shared_ptr<asio::io_service> ios)
00851    : m_ios(ios)
00852    , m_app_factory(new null_appender_factory)
00853 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00854    , m_stdout(*ios)
00855    , m_stderr(*ios)
00856 #endif
00857    , m_default_stdout_state(true)
00858    , m_default_stderr_state(true)
00859    , m_enable_local(false)
00860    , m_can_snoop_stdio(false)
00861    , m_listen_port(0)
00862    , m_pre_shutdown_func(noop_pre_shutdown_func())
00863 {
00864 }
00865 
00866 remote_shell_server_impl::~remote_shell_server_impl()
00867 {
00868 }
00869 
00870 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00871 bool remote_shell_server_impl::snoop_stdio(posix_stream_stealer& stealer, FILE *stream, stdstream *bsd, session_meth cb)
00872 {
00873    if (!stealer.steal(stream))
00874    {
00875       return false;
00876    }
00877 
00878    bsd->assign(stealer.get_pipe_fd());
00879 
00880    stdio_read_more(bsd, cb, new char[BUFSIZ]);
00881 
00882    return true;
00883 }
00884 
00885 void remote_shell_server_impl::on_stdio_read(stdstream *bsd, session_meth cb, const system::error_code& error, char *buffer, size_t count)
00886 {
00887    if (error)
00888    {
00889       delete[] buffer;
00890       return;
00891    }
00892 
00893    foreach(session_ptr s, m_sessions)
00894    {
00895       (s.get()->*cb)(buffer, count);
00896    }
00897 
00898    stdio_read_more(bsd, cb, buffer);
00899 }
00900 
00901 void remote_shell_server_impl::stdio_read_more(stdstream *bsd, session_meth cb, char *buffer)
00902 {
00903    bsd->async_read_some(asio::buffer(buffer, BUFSIZ),
00904        bind(&remote_shell_server_impl::on_stdio_read, this, bsd, cb,
00905          asio::placeholders::error, buffer, asio::placeholders::bytes_transferred));
00906 }
00907 #endif
00908 
00909 bool remote_shell_server_impl::setup_stdio_snoopers()
00910 {
00911 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00912    if (!snoop_stdio(m_stdout_stealer, stdout, &m_stdout, &session_base::add_stdout) ||
00913        !snoop_stdio(m_stderr_stealer, stderr, &m_stderr, &session_base::add_stderr))
00914    {
00915       return false;
00916    }
00917 
00918    // make sure our stolen streams behave like "standard" stdout/stderr
00919    setvbuf(stdout, NULL, _IOLBF, 0);
00920    setvbuf(stderr, NULL, _IONBF, 0);
00921 
00922    return true;
00923 #else
00924    return false;
00925 #endif
00926 }
00927 
00928 void remote_shell_server_impl::teardown_stdio_snoopers()
00929 {
00930 #ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
00931    m_stdout_stealer.restore();
00932    m_stdout.close();
00933    m_stderr_stealer.restore();
00934    m_stderr.close();
00935 #endif
00936 }
00937 
00938 bool remote_shell_server_impl::create_console_session(remote_shell_iface *rsi, session_ptr& new_session, stream_writer_ptr& writer)
00939 {
00940 #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) && defined(_POSIX_SOURCE)
00941 
00942    int orig_in_fd = ::fileno(stdin);
00943 
00944    if (orig_in_fd == -1)
00945    {
00946       return false;
00947    }
00948 
00949    int input_fd = ::dup(orig_in_fd);
00950 
00951    if (input_fd == -1)
00952    {
00953       return false;
00954    }
00955 
00956    // set up console session
00957    shared_ptr<session_io_console> io(new session_io_console(m_ios, input_fd,
00958       m_stdout_stealer.get_backup_fd(), m_stderr_stealer.get_backup_fd(), "local console"));
00959 
00960    new_session.reset(new session<session_io_console, false, true>(*this, rsi, io, m_can_snoop_stdio));
00961 
00962    writer.reset(new session_writer<session_io_console>(io));
00963 
00964    return true;
00965 
00966 #elif defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE) && defined(_WIN32)
00967 
00968    HANDLE in_hd = ::GetStdHandle(STD_INPUT_HANDLE);
00969    HANDLE out_hd = ::GetStdHandle(STD_OUTPUT_HANDLE);
00970    HANDLE err_hd = ::GetStdHandle(STD_ERROR_HANDLE);
00971 
00972    if (in_hd == INVALID_HANDLE_VALUE ||
00973        out_hd == INVALID_HANDLE_VALUE ||
00974        err_hd == INVALID_HANDLE_VALUE)
00975    {
00976       return false;
00977    }
00978 
00979    // set up console session
00980    shared_ptr<session_io_console> io(new session_io_console(m_ios, in_hd, out_hd, err_hd, "local console"));
00981 
00982    new_session.reset(new session<session_io_console, false, false>(*this, rsi, io, m_can_snoop_stdio));
00983 
00984    writer.reset(new session_writer<session_io_console>(io));
00985 
00986    return true;
00987 
00988 #else
00989 
00990    return false;
00991 
00992 #endif
00993 }
00994 
00995 bool remote_shell_server_impl::setup_console_session(remote_shell_iface *rsi)
00996 {
00997    session_ptr session;
00998    stream_writer_ptr writer;
00999 
01000    if (!create_console_session(rsi, session, writer))
01001    {
01002       return false;
01003    }
01004 
01005    session->set_stdout_state(m_default_stdout_state);
01006    session->set_stderr_state(m_default_stderr_state);
01007 
01008    m_sessions.insert(session);
01009 
01010    session->start(m_app_factory->create(writer));
01011 
01012    return true;
01013 }
01014 
01015 void remote_shell_server_impl::stop(const std::string& msg)
01016 {
01017    m_ios->post(bind(&remote_shell_server_impl::handle_stop, this, msg));
01018 }
01019 
01020 void remote_shell_server_impl::handle_accept(socket_ptr socket, remote_shell_iface *rsi, const system::error_code& error)
01021 {
01022    if (!error)
01023    {
01024       shared_ptr<session_io_socket> io(new session_io_socket(socket));
01025       session_ptr new_session(new session<session_io_socket>(*this, rsi, io, m_can_snoop_stdio));
01026 
01027       stream_writer_ptr writer(new session_writer<session_io_socket>(io));
01028 
01029       new_session->set_stdout_state(m_default_stdout_state);
01030       new_session->set_stderr_state(m_default_stderr_state);
01031 
01032       m_sessions.insert(new_session);
01033 
01034       new_session->start(m_app_factory->create(writer));
01035 
01036       accept_session(rsi);
01037    }
01038 }
01039 
01040 void remote_shell_server_impl::accept_session(remote_shell_iface *rsi)
01041 {
01042    socket_ptr socket(new tcp::socket(*m_ios));
01043 
01044    m_acceptor->async_accept(*socket,
01045        bind(&remote_shell_server_impl::handle_accept,
01046          this, socket, rsi, asio::placeholders::error));
01047 }
01048 
01049 void remote_shell_server_impl::remove_session(session_ptr p)
01050 {
01051    m_sessions.erase(p);
01052 }
01053 
01054 void remote_shell_server_impl::handle_stop(const std::string& msg)
01055 {
01056    if (m_acceptor)
01057    {
01058       m_acceptor->close();
01059    }
01060 
01061    foreach(session_ptr s, m_sessions)
01062    {
01063       m_ios->post(bind(&session_base::stop, s, msg));
01064    }
01065 
01066    m_ios->post(bind(&remote_shell_server_impl::pre_shutdown, this));
01067 }
01068 
01069 void remote_shell_server_impl::pre_shutdown()
01070 {
01071    teardown_stdio_snoopers();
01072    m_pre_shutdown_func();
01073 }
01074 
01075 void remote_shell_server_impl::get_sessions_list(std::string& rv)
01076 {
01077    std::ostringstream oss;
01078 
01079    foreach(session_ptr s, m_sessions)
01080    {
01081       oss << s->get_info() << "\r\n";
01082    }
01083 
01084    rv = oss.str();
01085 }
01086 
01087 void remote_shell_server_impl::process_command(session_ptr session, const std::string& cmd, const std::string& args)
01088 {
01089    {
01090       mutex::scoped_lock lock(m_cmd_queue_mutex);
01091       m_cmd_queue.push(command(session, cmd, args));
01092    }
01093 
01094    m_cmd_queue_cond.notify_one();
01095 }
01096 
01097 void remote_shell_server_impl::command_thread(remote_shell_iface *rsi)
01098 {
01099    /*
01100     *  Commands may need some time to execute, thus they are moved to a separate
01101     *  thread (this one) in order to avoid:
01102     *
01103     *    - blocking of any stdout/stderr output
01104     *    - blocking of any other client sessions
01105     */
01106 
01107    while (true)
01108    {
01109       command cmd;
01110 
01111       {
01112          mutex::scoped_lock lock(m_cmd_queue_mutex);
01113 
01114          if (m_cmd_queue.empty())
01115          {
01116             m_cmd_queue_cond.wait(lock);
01117          }
01118 
01119          if (m_cmd_queue.empty())
01120          {
01121             continue;
01122          }
01123 
01124          cmd = m_cmd_queue.front();
01125          m_cmd_queue.pop();
01126       }
01127 
01128       if (!cmd.session)
01129       {
01130          break;
01131       }
01132 
01133       std::string rv;
01134       bool handled = true;
01135 
01136       try
01137       {
01138          handled = rsi->handle_command(rv, cmd.cmd, cmd.args);
01139       }
01140       catch (const std::exception& e)
01141       {
01142          std::ostringstream oss;
01143          oss << "exception while running " << cmd.cmd << " command: " << e.what();
01144          rv = oss.str();
01145       }
01146       catch (...)
01147       {
01148          std::ostringstream oss;
01149          oss << "unknown exception while running " << cmd.cmd << " command";
01150          rv = oss.str();
01151       }
01152 
01153       if (m_sessions.find(cmd.session) != m_sessions.end())
01154       {
01155          m_ios->post(bind(&session_base::command_result, cmd.session, handled, cmd.cmd, rv));
01156       }
01157    }
01158 }
01159 
01160 void remote_shell_server_impl::set_appender_factory(appender_factory_ptr app_factory)
01161 {
01162    m_app_factory = app_factory;
01163 }
01164 
01165 void remote_shell_server_impl::set_default_stdout_state(bool enabled)
01166 {
01167    m_default_stdout_state = enabled;
01168 }
01169 
01170 void remote_shell_server_impl::set_default_stderr_state(bool enabled)
01171 {
01172    m_default_stderr_state = enabled;
01173 }
01174 
01175 void remote_shell_server_impl::set_listen_port(unsigned short port)
01176 {
01177    m_listen_port = port;
01178 }
01179 
01180 void remote_shell_server_impl::set_pre_shutdown_function(boost::function0<void>& func)
01181 {
01182    m_pre_shutdown_func = func;
01183 }
01184 
01185 void remote_shell_server_impl::enable_local_shell(bool enabled)
01186 {
01187    m_enable_local = enabled;
01188 }
01189 
01190 void remote_shell_server_impl::run(remote_shell_iface *rsi)
01191 {
01192    if (m_listen_port == 0 && !m_enable_local)
01193    {
01194       throw std::runtime_error("invalid configuration (no local and no remote port)");
01195    }
01196 
01197    std::ostringstream oss;
01198 
01199    m_can_snoop_stdio = setup_stdio_snoopers();
01200 
01201    if (!m_can_snoop_stdio)
01202    {
01203       oss << "stdio snooping not available\r\n";
01204    }
01205 
01206    m_welcome = oss.str();
01207 
01208    if (m_enable_local)
01209    {
01210       if (!setup_console_session(rsi))
01211       {
01212          throw std::runtime_error("failed to set up local shell session");
01213       }
01214    }
01215 
01216    if (m_listen_port != 0)
01217    {
01218       m_acceptor.reset(new tcp::acceptor(*m_ios, tcp::endpoint(tcp::v4(), m_listen_port)));
01219       accept_session(rsi);
01220    }
01221 
01222    m_cmd_runner.reset(new thread(boost::bind(&remote_shell_server_impl::command_thread, this, rsi)));
01223 
01224    m_ios->run();
01225 
01226    {
01227       mutex::scoped_lock lock(m_cmd_queue_mutex);
01228       m_cmd_queue.push(command()); // shutdown thread
01229    }
01230 
01231    m_cmd_queue_cond.notify_one();
01232    m_cmd_runner->join();
01233 }
01234 
01235 /**********************************************************************/
01236 
01237 remote_shell_server::remote_shell_server(shared_ptr<asio::io_service> ios)
01238    : m_impl(new remote_shell_server_impl(ios))
01239 {
01240 }
01241 
01242 remote_shell_server::~remote_shell_server()
01243 {
01244    delete m_impl;
01245 }
01246 
01247 void remote_shell_server::run(remote_shell_iface *rsi)
01248 {
01249    m_impl->run(rsi);
01250 }
01251 
01252 void remote_shell_server::stop(const std::string& msg)
01253 {
01254    m_impl->stop(msg);
01255 }
01256 
01257 void remote_shell_server::set_appender_factory(appender_factory_ptr app_factory)
01258 {
01259    m_impl->set_appender_factory(app_factory);
01260 }
01261 
01262 void remote_shell_server::set_default_stdout_state(bool enabled)
01263 {
01264    m_impl->set_default_stdout_state(enabled);
01265 }
01266 
01267 void remote_shell_server::set_default_stderr_state(bool enabled)
01268 {
01269    m_impl->set_default_stderr_state(enabled);
01270 }
01271 
01272 void remote_shell_server::set_listen_port(unsigned short port)
01273 {
01274    m_impl->set_listen_port(port);
01275 }
01276 
01277 void remote_shell_server::set_pre_shutdown_function(boost::function0<void>& func)
01278 {
01279    m_impl->set_pre_shutdown_function(func);
01280 }
01281 
01282 void remote_shell_server::enable_local_shell(bool enabled)
01283 {
01284    m_impl->enable_local_shell(enabled);
01285 }
01286