libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_IO_ASYNC_STREAM_FORWARDER_HPP__ 00029 #define MOOST_IO_ASYNC_STREAM_FORWARDER_HPP__ 00030 00040 #include <boost/asio.hpp> 00041 #include <boost/thread.hpp> 00042 #include <boost/shared_ptr.hpp> 00043 00044 #include "helper.hpp" 00045 00046 #if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE) && defined(_WIN32) 00047 # include "detail/async_stream_forwarder_win32.hpp" 00048 #elif defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) && (defined(_POSIX_SOURCE) || defined(__CYGWIN__)) 00049 # include "detail/async_stream_forwarder_posix.hpp" 00050 #else 00051 # error "apparently no stream forwarder support has been added for this platform" 00052 #endif 00053 00054 namespace moost { namespace io { 00055 00056 /* 00057 * Unfortunately, asynchronous reads for standard file streams (e.g. stdin) are a bad idea, 00058 * so in order to make them work with boost::asio, we need a separate thread that forwards the 00059 * stream data via an object that supports asynchronous i/o. 00060 */ 00061 00062 class async_stream_forwarder 00063 { 00064 public: 00065 typedef helper::native_io_t native_io_t; 00066 typedef helper::async_stream_t async_stream_t; 00067 00068 async_stream_forwarder(boost::shared_ptr<boost::asio::io_service> ios) 00069 : m_ios(ios) 00070 , m_input(*ios) 00071 { 00072 } 00073 00074 async_stream_forwarder(boost::shared_ptr<boost::asio::io_service> ios, native_io_t input, bool dup_input = true) 00075 : m_ios(ios) 00076 , m_input(*ios) 00077 { 00078 assign(input, dup_input); 00079 } 00080 00081 ~async_stream_forwarder() 00082 { 00083 // just to be on the safe side in case someone forgets to call close() explicitly 00084 try 00085 { 00086 close(); 00087 } 00088 catch (...) 00089 { 00090 // close() shouldn't throw at all, but let's make sure we don't throw during destruction 00091 } 00092 } 00093 00094 void assign(native_io_t input, bool dup_input = true) 00095 { 00096 if (m_input_thread) 00097 { 00098 throw std::runtime_error("attempt to assign handle twice"); 00099 } 00100 00101 m_in = dup_input ? helper::duplicate(input) : input; 00102 00103 try 00104 { 00105 native_io_t pipe_read; 00106 00107 helper::create_pipe(pipe_read, m_pipe_write); 00108 00109 try 00110 { 00111 m_input.assign(pipe_read); 00112 00113 m_input_thread.reset(new boost::thread(boost::bind(&async_stream_forwarder::input_thread, this))); 00114 } 00115 catch (...) 00116 { 00117 if (m_input.is_open()) 00118 { 00119 m_input.close(); 00120 } 00121 else 00122 { 00123 helper::close(pipe_read); 00124 } 00125 00126 throw; 00127 } 00128 } 00129 catch (...) 00130 { 00131 if (dup_input) 00132 { 00133 helper::close(m_in); 00134 } 00135 00136 throw; 00137 } 00138 } 00139 00140 template <typename HandlerT> 00141 void read_async(void *data, size_t size, HandlerT handler) 00142 { 00143 m_input.async_read_some(boost::asio::buffer(data, size), handler); 00144 } 00145 00146 void close() 00147 { 00148 if (m_input_thread) 00149 { 00150 m_input.close(); 00151 helper::close(m_in); 00152 helper::close(m_pipe_write); 00153 m_loop.stop(); 00154 m_input_thread->join(); 00155 m_input_thread.reset(); 00156 } 00157 } 00158 00159 private: 00160 void input_thread() const 00161 { 00162 // Continue forwarding from m_in to m_pipe_write as long as both reading 00163 // and writing don't fail. If either call fails, it's most probably due 00164 // to the associated file descriptor having been closed by the call to 00165 // the close() method. 00166 m_loop.run(m_in, m_pipe_write); 00167 } 00168 00169 boost::shared_ptr<boost::asio::io_service> m_ios; 00170 boost::shared_ptr<boost::thread> m_input_thread; 00171 00172 detail::forwarding_loop m_loop; 00173 00174 native_io_t m_in; 00175 native_io_t m_pipe_write; 00176 async_stream_t m_input; 00177 }; 00178 00179 } } 00180 00181 #endif