libmoost
/home/mhx/git/github/libmoost/include/moost/thread/async_worker.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_THREAD_ASYNC_WORKER_HPP__
00029 #define MOOST_THREAD_ASYNC_WORKER_HPP__
00030 
00031 #include <vector>
00032 #include <stdexcept>
00033 
00034 #include <boost/thread.hpp>
00035 #include <boost/thread/condition.hpp>
00036 #include <boost/bind.hpp>
00037 #include <boost/shared_ptr.hpp>
00038 
00039 #include "xtime_util.hpp"
00040 
00041 namespace moost { namespace thread {
00042 
00044 struct enqueue_timeout : public std::runtime_error
00045 {
00046   enqueue_timeout() : std::runtime_error("enqueue timed out") {}
00047 };
00048 
00053 template<typename TWork>
00054 class async_worker
00055 {
00056 private:
00057 
00058   std::vector< boost::shared_ptr<boost::thread> > m_pworker_threads;
00059   std::vector< boost::shared_ptr< TWork > >       m_work;
00060   size_t                                          m_max_queue;
00061   size_t                                          m_enqueue_timeout_ms;
00062   bool                                            m_working;
00063 
00064   boost::mutex                                    m_work_mutex;
00065   boost::condition                                m_work_to_do;
00066   boost::condition                                m_work_done;
00067 
00069   void work_loop()
00070   {
00071     for (;;)
00072     {
00073       boost::shared_ptr< TWork > work;
00074 
00075       {
00076         boost::mutex::scoped_lock lock(m_work_mutex);
00077 
00078         while (m_work.empty())
00079         {
00080           if (!m_working)
00081             break;
00082           m_work_to_do.wait(lock);
00083         }
00084         if (m_work.empty()) // can only be true if we were told to stop and we have nothing left to do
00085           break;
00086 
00087         // inform anyone waiting to enqueue that a spot has freed up
00088         m_work_done.notify_one();
00089         work = m_work.front();
00090         m_work.erase(m_work.begin());
00091       }
00092 
00093       try
00094       {
00095         do_work(*work);
00096       }
00097       catch (const std::exception & e)
00098       {
00099         report_error(e);
00100       }
00101       catch (...)
00102       {
00103         report_error(std::runtime_error("async_worker: unknown exception in worker"));
00104       }
00105     }
00106   }
00107 
00108 protected:
00109 
00111   virtual void do_work(TWork & work) = 0;
00112 
00114   virtual void report_error(const std::exception &) {}
00115 
00116 public:
00117 
00123   async_worker(size_t num_threads = 1,
00124                size_t    max_queue = 0,
00125                size_t    enqueue_timeout_ms = 0)
00126   : m_pworker_threads(num_threads),
00127     m_max_queue(max_queue),
00128     m_enqueue_timeout_ms(enqueue_timeout_ms),
00129     m_working(false)
00130   {
00131     start();
00132   }
00133 
00139   virtual ~async_worker()
00140   {
00141     // it would be a bug to put stop() here and allow the user the assume async_workers are safe to destroy
00142     // without stopping.  what happens is the implementing class destroys first, and if you're unlucky,
00143     // the implementing class's do_work has now disappeared but the worker thread is still running
00144     // very bad things happen as a result
00145     // if you want this behavior, put stop() into the implementing class' dtor
00146   }
00147 
00152   void enqueue(const TWork & work)
00153   {
00154     boost::mutex::scoped_lock lock(m_work_mutex);
00155 
00156     if (!m_working)
00157       throw std::runtime_error("can't enqueue when not working");
00158 
00159     while (m_max_queue > 0 && m_work.size() == static_cast<size_t>(m_max_queue))
00160     {
00161       // too much work to do!
00162       if (m_enqueue_timeout_ms == 0)
00163         m_work_done.wait(lock);
00164       else
00165       {
00166         if (!m_work_done.timed_wait(lock, xtime_util::add_ms(xtime_util::now(), m_enqueue_timeout_ms)))
00167           throw enqueue_timeout();
00168       }
00169     }
00170 
00171     m_work.push_back(boost::shared_ptr<TWork>(new TWork(work)));
00172 
00173     // wake up, you got work to do!
00174     // notice we notify even if queue wasn't empty
00175     // this way we ensure we're running as many
00176     // parallel workers as possible
00177     // TODO: some people say it's less efficient to notify while you still own the lock
00178     // i don't believe this, but it might be worth testing
00179     m_work_to_do.notify_one();
00180   }
00181 
00182   // @brief starts all worker threads
00183   void start()
00184   {
00185     {
00186       boost::mutex::scoped_lock lock(m_work_mutex);
00187       if (m_working)
00188         return;
00189       m_working = true;
00190     }
00191     for (std::vector< boost::shared_ptr<boost::thread> >::iterator it = m_pworker_threads.begin(); it != m_pworker_threads.end(); ++it)
00192       it->reset(new boost::thread(boost::bind(&async_worker::work_loop, this)));
00193   }
00194 
00195   // @brief stops all worker threads, and waits for them to finish all enqueued work
00196   void stop()
00197   {
00198     {
00199       boost::mutex::scoped_lock lock(m_work_mutex);
00200       if (!m_working)
00201         return;
00202       m_working = false;
00203     }
00204     m_work_to_do.notify_all();
00205     for (std::vector< boost::shared_ptr<boost::thread> >::iterator it = m_pworker_threads.begin(); it != m_pworker_threads.end(); ++it)
00206       (*it)->join();
00207   }
00208 
00209 };
00210 
00211 }} // moost::thread
00212 
00213 #endif // MOOST_THREAD_ASYNC_WORKER_HPP__