libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_THREAD_ASYNC_BATCH_PROCESSOR 00029 #define MOOST_THREAD_ASYNC_BATCH_PROCESSOR 00030 00031 #include <boost/function.hpp> 00032 #include <boost/ref.hpp> 00033 #include <boost/thread.hpp> 00034 #include <boost/shared_ptr.hpp> 00035 #include <boost/bind.hpp> 00036 00037 #include "../utils/foreach.hpp" 00038 00039 #include "worker_group.hpp" 00040 00041 namespace moost { namespace thread { 00042 00059 class async_batch_processor 00060 { 00061 private: 00062 class batch_state 00063 { 00064 public: 00065 batch_state(size_t todo) 00066 : m_todo(todo) 00067 { 00068 } 00069 00070 void one_done() 00071 { 00072 { 00073 boost::lock_guard<boost::mutex> lock(m_mx); 00074 --m_todo; 00075 } 00076 00077 m_cond.notify_one(); 00078 } 00079 00080 void wait_for_all_done() 00081 { 00082 boost::unique_lock<boost::mutex> lock(m_mx); 00083 00084 while(m_todo > 0) // when m_todo == 0 all jobs are done 00085 { 00086 // wait for workers to signal when they're done 00087 m_cond.wait(lock); 00088 } 00089 } 00090 00091 private: 00092 volatile size_t m_todo; 00093 boost::mutex m_mx; 00094 boost::condition_variable m_cond; 00095 }; 00096 00097 public: 00098 typedef boost::function0<void> job_t; 00099 typedef std::vector<job_t> jobs_t; 00100 00107 async_batch_processor(size_t num_threads) 00108 : m_wg(num_threads) 00109 { 00110 } 00111 00121 void dispatch(jobs_t & jobs) 00122 { 00123 if (!jobs.empty()) 00124 { 00125 boost::shared_ptr<batch_state> state(new batch_state(jobs.size())); 00126 00127 // dispatch jobs 00128 foreach (job_t& job, jobs) 00129 { 00130 m_wg.add_job(boost::bind(&async_batch_processor::handler, boost::ref(job), state)); 00131 } 00132 00133 // and wait until all are done 00134 state->wait_for_all_done(); 00135 } 00136 } 00137 00138 private: 00139 static void handler(job_t& job, boost::shared_ptr<batch_state> state) 00140 { 00141 // do job 00142 job(); 00143 00144 // and tell the state object we're done 00145 state->one_done(); 00146 } 00147 00148 worker_group m_wg; 00149 }; 00150 00151 }} 00152 00153 #endif // MOOST_THREAD_ASYNC_BATCH_PROCESSOR