libmoost
/home/mhx/git/github/libmoost/include/moost/thread/threaded_job_scheduler.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00030 #ifndef MOOST_THREAD_THREADED_JOB_SCHEDULER_HPP
00031 #define MOOST_THREAD_THREADED_JOB_SCHEDULER_HPP
00032 
00033 #include <cassert>
00034 #include <queue>
00035 #include <string>
00036 #include <vector>
00037 
00038 #include <boost/noncopyable.hpp>
00039 #include <boost/enable_shared_from_this.hpp>
00040 #include <boost/bind.hpp>
00041 
00042 #include "job_batch.hpp"
00043 #include "worker_group.hpp"
00044 
00045 namespace moost { namespace thread {
00046 
00053 class threaded_job_batch : public job_batch
00054                          , public boost::noncopyable
00055                          , public boost::enable_shared_from_this<threaded_job_batch>
00056 {
00057 public:
00058    threaded_job_batch()
00059       : m_todo(0)
00060       , m_count(0)
00061    {
00062    }
00063 
00064    virtual void add(job_t job)
00065    {
00066       {
00067          boost::mutex::scoped_lock lock(m_mx);
00068          m_jobs.push(job);
00069          ++m_todo;
00070       }
00071 
00072       m_cond.notify_one();
00073    }
00074 
00075    void run(worker_group& wg)
00076    {
00077       while (!done())
00078       {
00079          do_one(wg);
00080       }
00081    }
00082 
00088    size_t count() const
00089    {
00090       boost::mutex::scoped_lock lock(m_mx);
00091       return m_count;
00092    }
00093 
00106    const std::vector<std::string>& errors() const
00107    {
00108       boost::mutex::scoped_lock lock(m_mx);
00109 
00110       if (m_todo > 0)
00111       {
00112          throw std::runtime_error("cannot call errors() when there are unfinished jobs");
00113       }
00114 
00115       return m_errors;
00116    }
00117 
00118 private:
00119    typedef std::queue<job_t> jobs_t;
00120 
00128    void do_one(worker_group& wg)
00129    {
00130       job_t job;
00131 
00132       {
00133          boost::mutex::scoped_lock lock(m_mx);
00134          assert(!m_jobs.empty());
00135          job = m_jobs.front();
00136          m_jobs.pop();
00137       }
00138 
00139       wg.add_job(boost::bind(&threaded_job_batch::run_one, shared_from_this(), job));
00140    }
00141 
00150    bool done() const
00151    {
00152       boost::mutex::scoped_lock lock(m_mx);
00153 
00154       while (m_todo > 0 && m_jobs.empty())
00155       {
00156          m_cond.wait(lock);
00157       }
00158 
00159       return m_todo == 0;
00160    }
00161 
00168    void run_one(job_t& job)
00169    {
00170       try
00171       {
00172          job();  // JFDI! :)
00173       }
00174       catch (const std::exception& e)
00175       {
00176          boost::mutex::scoped_lock lock(m_mx_errors);
00177          m_errors.push_back(e.what());
00178       }
00179       catch (...)
00180       {
00181          boost::mutex::scoped_lock lock(m_mx_errors);
00182          m_errors.push_back("unknown exception caught");
00183       }
00184 
00185       {
00186          boost::mutex::scoped_lock lock(m_mx);
00187          --m_todo;
00188          ++m_count;
00189       }
00190 
00191       m_cond.notify_one();
00192    }
00193 
00194    jobs_t m_jobs;
00195    mutable boost::condition_variable m_cond;
00196    mutable boost::mutex m_mx;
00197    volatile size_t m_todo;
00198    volatile size_t m_count;
00199    std::vector<std::string> m_errors;
00200    mutable boost::mutex m_mx_errors;
00201 };
00202 
00209 class threaded_job_scheduler : public boost::noncopyable
00210 {
00211 public:
00212    typedef threaded_job_batch job_batch_type;
00213 
00219    explicit threaded_job_scheduler(size_t num_workers = 1)
00220       : m_workers(num_workers)
00221    {
00222    }
00223 
00229    void dispatch(boost::shared_ptr<job_batch_type> batch)
00230    {
00231       batch->run(m_workers);
00232    }
00233 
00234 private:
00235    worker_group m_workers;
00236 };
00237 
00238 }}
00239 
00240 #endif