libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00030 #ifndef MOOST_THREAD_THREADED_JOB_SCHEDULER_HPP 00031 #define MOOST_THREAD_THREADED_JOB_SCHEDULER_HPP 00032 00033 #include <cassert> 00034 #include <queue> 00035 #include <string> 00036 #include <vector> 00037 00038 #include <boost/noncopyable.hpp> 00039 #include <boost/enable_shared_from_this.hpp> 00040 #include <boost/bind.hpp> 00041 00042 #include "job_batch.hpp" 00043 #include "worker_group.hpp" 00044 00045 namespace moost { namespace thread { 00046 00053 class threaded_job_batch : public job_batch 00054 , public boost::noncopyable 00055 , public boost::enable_shared_from_this<threaded_job_batch> 00056 { 00057 public: 00058 threaded_job_batch() 00059 : m_todo(0) 00060 , m_count(0) 00061 { 00062 } 00063 00064 virtual void add(job_t job) 00065 { 00066 { 00067 boost::mutex::scoped_lock lock(m_mx); 00068 m_jobs.push(job); 00069 ++m_todo; 00070 } 00071 00072 m_cond.notify_one(); 00073 } 00074 00075 void run(worker_group& wg) 00076 { 00077 while (!done()) 00078 { 00079 do_one(wg); 00080 } 00081 } 00082 00088 size_t count() const 00089 { 00090 boost::mutex::scoped_lock lock(m_mx); 00091 return m_count; 00092 } 00093 00106 const std::vector<std::string>& errors() const 00107 { 00108 boost::mutex::scoped_lock lock(m_mx); 00109 00110 if (m_todo > 0) 00111 { 00112 throw std::runtime_error("cannot call errors() when there are unfinished jobs"); 00113 } 00114 00115 return m_errors; 00116 } 00117 00118 private: 00119 typedef std::queue<job_t> jobs_t; 00120 00128 void do_one(worker_group& wg) 00129 { 00130 job_t job; 00131 00132 { 00133 boost::mutex::scoped_lock lock(m_mx); 00134 assert(!m_jobs.empty()); 00135 job = m_jobs.front(); 00136 m_jobs.pop(); 00137 } 00138 00139 wg.add_job(boost::bind(&threaded_job_batch::run_one, shared_from_this(), job)); 00140 } 00141 00150 bool done() const 00151 { 00152 boost::mutex::scoped_lock lock(m_mx); 00153 00154 while (m_todo > 0 && m_jobs.empty()) 00155 { 00156 m_cond.wait(lock); 00157 } 00158 00159 return m_todo == 0; 00160 } 00161 00168 void run_one(job_t& job) 00169 { 00170 try 00171 { 00172 job(); // JFDI! :) 00173 } 00174 catch (const std::exception& e) 00175 { 00176 boost::mutex::scoped_lock lock(m_mx_errors); 00177 m_errors.push_back(e.what()); 00178 } 00179 catch (...) 00180 { 00181 boost::mutex::scoped_lock lock(m_mx_errors); 00182 m_errors.push_back("unknown exception caught"); 00183 } 00184 00185 { 00186 boost::mutex::scoped_lock lock(m_mx); 00187 --m_todo; 00188 ++m_count; 00189 } 00190 00191 m_cond.notify_one(); 00192 } 00193 00194 jobs_t m_jobs; 00195 mutable boost::condition_variable m_cond; 00196 mutable boost::mutex m_mx; 00197 volatile size_t m_todo; 00198 volatile size_t m_count; 00199 std::vector<std::string> m_errors; 00200 mutable boost::mutex m_mx_errors; 00201 }; 00202 00209 class threaded_job_scheduler : public boost::noncopyable 00210 { 00211 public: 00212 typedef threaded_job_batch job_batch_type; 00213 00219 explicit threaded_job_scheduler(size_t num_workers = 1) 00220 : m_workers(num_workers) 00221 { 00222 } 00223 00229 void dispatch(boost::shared_ptr<job_batch_type> batch) 00230 { 00231 batch->run(m_workers); 00232 } 00233 00234 private: 00235 worker_group m_workers; 00236 }; 00237 00238 }} 00239 00240 #endif