libmoost
/home/mhx/git/github/libmoost/include/moost/thread/worker_group.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00030 #ifndef MOOST_THREAD_WORKER_GROUP_HPP
00031 #define MOOST_THREAD_WORKER_GROUP_HPP
00032 
00033 #include <queue>
00034 #include <csignal>
00035 
00036 #include <boost/function.hpp>
00037 #include <boost/thread.hpp>
00038 #include <boost/bind.hpp>
00039 
00040 namespace moost { namespace thread {
00041 
00049 class worker_group : public boost::noncopyable
00050 {
00051 public:
00052    typedef boost::function0<void> job_t;
00053 
00059    explicit worker_group(size_t num_workers = 1)
00060       : m_running(1)
00061    {
00062       if (num_workers < 1)
00063       {
00064          throw std::runtime_error("invalid number of worker threads");
00065       }
00066 
00067       for (size_t i = 0; i < num_workers; ++i)
00068       {
00069          m_workers.create_thread(boost::bind(&worker_group::work, this));
00070       }
00071    }
00072 
00076    ~worker_group()
00077    {
00078       try
00079       {
00080          stop();
00081       }
00082       catch (...)
00083       {
00084       }
00085    }
00086 
00090    void stop()
00091    {
00092       if (m_running)
00093       {
00094          {
00095             boost::mutex::scoped_lock lock(m_mx);
00096             m_running = 0;
00097          }
00098          m_cond.notify_all();
00099          m_workers.join_all();
00100       }
00101    }
00102 
00106    bool running() const
00107    {
00108       return m_running;
00109    }
00110 
00118    bool add_job(job_t job)
00119    {
00120       bool added = m_running != 0;
00121 
00122       if (added)
00123       {
00124          {
00125             boost::mutex::scoped_lock lock(m_mx);
00126             m_jobs.push(job);
00127          }
00128 
00129          m_cond.notify_one();
00130       }
00131 
00132       return added;
00133    }
00134 
00140    size_t size() const
00141    {
00142       return m_workers.size();
00143    }
00144 
00150    size_t queued_jobs() const
00151    {
00152       boost::mutex::scoped_lock lock(m_mx);
00153       return m_jobs.size();
00154    }
00155 
00156 private:
00157    typedef std::queue<job_t> jobs_t;
00158 
00159    void work()
00160    {
00161       for (;;)
00162       {
00163          job_t job;
00164 
00165          {
00166             boost::mutex::scoped_lock lock(m_mx);
00167 
00168             while (m_jobs.empty() && m_running)
00169             {
00170                m_cond.wait(lock);
00171             }
00172 
00173             if (m_jobs.empty())
00174             {
00175                if (m_running)
00176                   continue;
00177                else
00178                   break;
00179             }
00180 
00181             job = m_jobs.front();
00182             m_jobs.pop();
00183          }
00184 
00185          job();
00186       }
00187    }
00188 
00189    boost::thread_group m_workers;
00190    jobs_t m_jobs;
00191    boost::condition_variable m_cond;
00192    mutable boost::mutex m_mx;
00193    volatile sig_atomic_t m_running;
00194 };
00195 
00196 }}
00197 
00198 #endif