libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00030 #ifndef MOOST_THREAD_WORKER_GROUP_HPP 00031 #define MOOST_THREAD_WORKER_GROUP_HPP 00032 00033 #include <queue> 00034 #include <csignal> 00035 00036 #include <boost/function.hpp> 00037 #include <boost/thread.hpp> 00038 #include <boost/bind.hpp> 00039 00040 namespace moost { namespace thread { 00041 00049 class worker_group : public boost::noncopyable 00050 { 00051 public: 00052 typedef boost::function0<void> job_t; 00053 00059 explicit worker_group(size_t num_workers = 1) 00060 : m_running(1) 00061 { 00062 if (num_workers < 1) 00063 { 00064 throw std::runtime_error("invalid number of worker threads"); 00065 } 00066 00067 for (size_t i = 0; i < num_workers; ++i) 00068 { 00069 m_workers.create_thread(boost::bind(&worker_group::work, this)); 00070 } 00071 } 00072 00076 ~worker_group() 00077 { 00078 try 00079 { 00080 stop(); 00081 } 00082 catch (...) 00083 { 00084 } 00085 } 00086 00090 void stop() 00091 { 00092 if (m_running) 00093 { 00094 { 00095 boost::mutex::scoped_lock lock(m_mx); 00096 m_running = 0; 00097 } 00098 m_cond.notify_all(); 00099 m_workers.join_all(); 00100 } 00101 } 00102 00106 bool running() const 00107 { 00108 return m_running; 00109 } 00110 00118 bool add_job(job_t job) 00119 { 00120 bool added = m_running != 0; 00121 00122 if (added) 00123 { 00124 { 00125 boost::mutex::scoped_lock lock(m_mx); 00126 m_jobs.push(job); 00127 } 00128 00129 m_cond.notify_one(); 00130 } 00131 00132 return added; 00133 } 00134 00140 size_t size() const 00141 { 00142 return m_workers.size(); 00143 } 00144 00150 size_t queued_jobs() const 00151 { 00152 boost::mutex::scoped_lock lock(m_mx); 00153 return m_jobs.size(); 00154 } 00155 00156 private: 00157 typedef std::queue<job_t> jobs_t; 00158 00159 void work() 00160 { 00161 for (;;) 00162 { 00163 job_t job; 00164 00165 { 00166 boost::mutex::scoped_lock lock(m_mx); 00167 00168 while (m_jobs.empty() && m_running) 00169 { 00170 m_cond.wait(lock); 00171 } 00172 00173 if (m_jobs.empty()) 00174 { 00175 if (m_running) 00176 continue; 00177 else 00178 break; 00179 } 00180 00181 job = m_jobs.front(); 00182 m_jobs.pop(); 00183 } 00184 00185 job(); 00186 } 00187 } 00188 00189 boost::thread_group m_workers; 00190 jobs_t m_jobs; 00191 boost::condition_variable m_cond; 00192 mutable boost::mutex m_mx; 00193 volatile sig_atomic_t m_running; 00194 }; 00195 00196 }} 00197 00198 #endif