libmoost
/home/mhx/git/github/libmoost/include/moost/io/async_stream_forwarder.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_IO_ASYNC_STREAM_FORWARDER_HPP__
00029 #define MOOST_IO_ASYNC_STREAM_FORWARDER_HPP__
00030 
00040 #include <boost/asio.hpp>
00041 #include <boost/thread.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 
00044 #include "helper.hpp"
00045 
00046 #if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE) && defined(_WIN32)
00047 # include "detail/async_stream_forwarder_win32.hpp"
00048 #elif defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) && (defined(_POSIX_SOURCE) || defined(__CYGWIN__))
00049 # include "detail/async_stream_forwarder_posix.hpp"
00050 #else
00051 # error "apparently no stream forwarder support has been added for this platform"
00052 #endif
00053 
00054 namespace moost { namespace io {
00055 
00056 /*
00057  *  Unfortunately, asynchronous reads for standard file streams (e.g. stdin) are a bad idea,
00058  *  so in order to make them work with boost::asio, we need a separate thread that forwards the
00059  *  stream data via an object that supports asynchronous i/o.
00060  */
00061 
00062 class async_stream_forwarder
00063 {
00064 public:
00065    typedef helper::native_io_t native_io_t;
00066    typedef helper::async_stream_t async_stream_t;
00067 
00068    async_stream_forwarder(boost::shared_ptr<boost::asio::io_service> ios)
00069      : m_ios(ios)
00070      , m_input(*ios)
00071    {
00072    }
00073 
00074    async_stream_forwarder(boost::shared_ptr<boost::asio::io_service> ios, native_io_t input, bool dup_input = true)
00075      : m_ios(ios)
00076      , m_input(*ios)
00077    {
00078       assign(input, dup_input);
00079    }
00080 
00081    ~async_stream_forwarder()
00082    {
00083       // just to be on the safe side in case someone forgets to call close() explicitly
00084       try
00085       {
00086          close();
00087       }
00088       catch (...)
00089       {
00090          // close() shouldn't throw at all, but let's make sure we don't throw during destruction
00091       }
00092    }
00093 
00094    void assign(native_io_t input, bool dup_input = true)
00095    {
00096       if (m_input_thread)
00097       {
00098          throw std::runtime_error("attempt to assign handle twice");
00099       }
00100 
00101       m_in = dup_input ? helper::duplicate(input) : input;
00102 
00103       try
00104       {
00105          native_io_t pipe_read;
00106 
00107          helper::create_pipe(pipe_read, m_pipe_write);
00108 
00109          try
00110          {
00111             m_input.assign(pipe_read);
00112 
00113             m_input_thread.reset(new boost::thread(boost::bind(&async_stream_forwarder::input_thread, this)));
00114          }
00115          catch (...)
00116          {
00117             if (m_input.is_open())
00118             {
00119                m_input.close();
00120             }
00121             else
00122             {
00123                helper::close(pipe_read);
00124             }
00125 
00126             throw;
00127          }
00128       }
00129       catch (...)
00130       {
00131          if (dup_input)
00132          {
00133             helper::close(m_in);
00134          }
00135 
00136          throw;
00137       }
00138    }
00139 
00140    template <typename HandlerT>
00141    void read_async(void *data, size_t size, HandlerT handler)
00142    {
00143       m_input.async_read_some(boost::asio::buffer(data, size), handler);
00144    }
00145 
00146    void close()
00147    {
00148       if (m_input_thread)
00149       {
00150          m_input.close();
00151          helper::close(m_in);
00152          helper::close(m_pipe_write);
00153          m_loop.stop();
00154          m_input_thread->join();
00155          m_input_thread.reset();
00156       }
00157    }
00158 
00159 private:
00160    void input_thread() const
00161    {
00162       // Continue forwarding from m_in to m_pipe_write as long as both reading
00163       // and writing don't fail. If either call fails, it's most probably due
00164       // to the associated file descriptor having been closed by the call to
00165       // the close() method.
00166       m_loop.run(m_in, m_pipe_write);
00167    }
00168 
00169    boost::shared_ptr<boost::asio::io_service> m_ios;
00170    boost::shared_ptr<boost::thread> m_input_thread;
00171 
00172    detail::forwarding_loop m_loop;
00173 
00174    native_io_t m_in;
00175    native_io_t m_pipe_write;
00176    async_stream_t m_input;
00177 };
00178 
00179 } }
00180 
00181 #endif