libmoost
/home/mhx/git/github/libmoost/include/moost/thread/async_batch_processor.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_THREAD_ASYNC_BATCH_PROCESSOR
00029 #define MOOST_THREAD_ASYNC_BATCH_PROCESSOR
00030 
00031 #include <boost/function.hpp>
00032 #include <boost/ref.hpp>
00033 #include <boost/thread.hpp>
00034 #include <boost/shared_ptr.hpp>
00035 #include <boost/bind.hpp>
00036 
00037 #include "../utils/foreach.hpp"
00038 
00039 #include "worker_group.hpp"
00040 
00041 namespace moost { namespace thread {
00042 
00059 class async_batch_processor
00060 {
00061 private:
00062    class batch_state
00063    {
00064    public:
00065       batch_state(size_t todo)
00066          : m_todo(todo)
00067       {
00068       }
00069 
00070       void one_done()
00071       {
00072          {
00073             boost::lock_guard<boost::mutex> lock(m_mx);
00074             --m_todo;
00075          }
00076 
00077          m_cond.notify_one();
00078       }
00079 
00080       void wait_for_all_done()
00081       {
00082          boost::unique_lock<boost::mutex> lock(m_mx);
00083 
00084          while(m_todo > 0) // when m_todo == 0 all jobs are done
00085          {
00086             // wait for workers to signal when they're done
00087             m_cond.wait(lock);
00088          }
00089       }
00090 
00091    private:
00092       volatile size_t m_todo;
00093       boost::mutex m_mx;
00094       boost::condition_variable m_cond;
00095    };
00096 
00097 public:
00098    typedef boost::function0<void> job_t;
00099    typedef std::vector<job_t> jobs_t;
00100 
00107    async_batch_processor(size_t num_threads)
00108       : m_wg(num_threads)
00109    {
00110    }
00111 
00121    void dispatch(jobs_t & jobs)
00122    {
00123       if (!jobs.empty())
00124       {
00125          boost::shared_ptr<batch_state> state(new batch_state(jobs.size()));
00126 
00127          // dispatch jobs
00128          foreach (job_t& job, jobs)
00129          {
00130             m_wg.add_job(boost::bind(&async_batch_processor::handler, boost::ref(job), state));
00131          }
00132 
00133          // and wait until all are done
00134          state->wait_for_all_done();
00135       }
00136    }
00137 
00138 private:
00139    static void handler(job_t& job, boost::shared_ptr<batch_state> state)
00140    {
00141       // do job
00142       job();
00143 
00144       // and tell the state object we're done
00145       state->one_done();
00146    }
00147 
00148    worker_group m_wg;
00149 };
00150 
00151 }}
00152 
00153 #endif // MOOST_THREAD_ASYNC_BATCH_PROCESSOR