libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_THREAD_ASYNC_WORKER_HPP__ 00029 #define MOOST_THREAD_ASYNC_WORKER_HPP__ 00030 00031 #include <vector> 00032 #include <stdexcept> 00033 00034 #include <boost/thread.hpp> 00035 #include <boost/thread/condition.hpp> 00036 #include <boost/bind.hpp> 00037 #include <boost/shared_ptr.hpp> 00038 00039 #include "xtime_util.hpp" 00040 00041 namespace moost { namespace thread { 00042 00044 struct enqueue_timeout : public std::runtime_error 00045 { 00046 enqueue_timeout() : std::runtime_error("enqueue timed out") {} 00047 }; 00048 00053 template<typename TWork> 00054 class async_worker 00055 { 00056 private: 00057 00058 std::vector< boost::shared_ptr<boost::thread> > m_pworker_threads; 00059 std::vector< boost::shared_ptr< TWork > > m_work; 00060 size_t m_max_queue; 00061 size_t m_enqueue_timeout_ms; 00062 bool m_working; 00063 00064 boost::mutex m_work_mutex; 00065 boost::condition m_work_to_do; 00066 boost::condition m_work_done; 00067 00069 void work_loop() 00070 { 00071 for (;;) 00072 { 00073 boost::shared_ptr< TWork > work; 00074 00075 { 00076 boost::mutex::scoped_lock lock(m_work_mutex); 00077 00078 while (m_work.empty()) 00079 { 00080 if (!m_working) 00081 break; 00082 m_work_to_do.wait(lock); 00083 } 00084 if (m_work.empty()) // can only be true if we were told to stop and we have nothing left to do 00085 break; 00086 00087 // inform anyone waiting to enqueue that a spot has freed up 00088 m_work_done.notify_one(); 00089 work = m_work.front(); 00090 m_work.erase(m_work.begin()); 00091 } 00092 00093 try 00094 { 00095 do_work(*work); 00096 } 00097 catch (const std::exception & e) 00098 { 00099 report_error(e); 00100 } 00101 catch (...) 00102 { 00103 report_error(std::runtime_error("async_worker: unknown exception in worker")); 00104 } 00105 } 00106 } 00107 00108 protected: 00109 00111 virtual void do_work(TWork & work) = 0; 00112 00114 virtual void report_error(const std::exception &) {} 00115 00116 public: 00117 00123 async_worker(size_t num_threads = 1, 00124 size_t max_queue = 0, 00125 size_t enqueue_timeout_ms = 0) 00126 : m_pworker_threads(num_threads), 00127 m_max_queue(max_queue), 00128 m_enqueue_timeout_ms(enqueue_timeout_ms), 00129 m_working(false) 00130 { 00131 start(); 00132 } 00133 00139 virtual ~async_worker() 00140 { 00141 // it would be a bug to put stop() here and allow the user the assume async_workers are safe to destroy 00142 // without stopping. what happens is the implementing class destroys first, and if you're unlucky, 00143 // the implementing class's do_work has now disappeared but the worker thread is still running 00144 // very bad things happen as a result 00145 // if you want this behavior, put stop() into the implementing class' dtor 00146 } 00147 00152 void enqueue(const TWork & work) 00153 { 00154 boost::mutex::scoped_lock lock(m_work_mutex); 00155 00156 if (!m_working) 00157 throw std::runtime_error("can't enqueue when not working"); 00158 00159 while (m_max_queue > 0 && m_work.size() == static_cast<size_t>(m_max_queue)) 00160 { 00161 // too much work to do! 00162 if (m_enqueue_timeout_ms == 0) 00163 m_work_done.wait(lock); 00164 else 00165 { 00166 if (!m_work_done.timed_wait(lock, xtime_util::add_ms(xtime_util::now(), m_enqueue_timeout_ms))) 00167 throw enqueue_timeout(); 00168 } 00169 } 00170 00171 m_work.push_back(boost::shared_ptr<TWork>(new TWork(work))); 00172 00173 // wake up, you got work to do! 00174 // notice we notify even if queue wasn't empty 00175 // this way we ensure we're running as many 00176 // parallel workers as possible 00177 // TODO: some people say it's less efficient to notify while you still own the lock 00178 // i don't believe this, but it might be worth testing 00179 m_work_to_do.notify_one(); 00180 } 00181 00182 // @brief starts all worker threads 00183 void start() 00184 { 00185 { 00186 boost::mutex::scoped_lock lock(m_work_mutex); 00187 if (m_working) 00188 return; 00189 m_working = true; 00190 } 00191 for (std::vector< boost::shared_ptr<boost::thread> >::iterator it = m_pworker_threads.begin(); it != m_pworker_threads.end(); ++it) 00192 it->reset(new boost::thread(boost::bind(&async_worker::work_loop, this))); 00193 } 00194 00195 // @brief stops all worker threads, and waits for them to finish all enqueued work 00196 void stop() 00197 { 00198 { 00199 boost::mutex::scoped_lock lock(m_work_mutex); 00200 if (!m_working) 00201 return; 00202 m_working = false; 00203 } 00204 m_work_to_do.notify_all(); 00205 for (std::vector< boost::shared_ptr<boost::thread> >::iterator it = m_pworker_threads.begin(); it != m_pworker_threads.end(); ++it) 00206 (*it)->join(); 00207 } 00208 00209 }; 00210 00211 }} // moost::thread 00212 00213 #endif // MOOST_THREAD_ASYNC_WORKER_HPP__