libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_REMOTE_WATCHER_HPP__ 00029 #define MOOST_REMOTE_WATCHER_HPP__ 00030 00031 #include <map> 00032 #include <string> 00033 #include <sstream> 00034 #include <limits> 00035 #include <functional> 00036 00037 #include <boost/asio.hpp> 00038 00039 #include <boost/thread.hpp> 00040 #include <boost/shared_ptr.hpp> 00041 #include <boost/bind.hpp> 00042 #include <boost/function.hpp> 00043 #include <boost/algorithm/string/trim.hpp> 00044 #include <boost/functional/hash.hpp> 00045 00046 #include "../thread/xtime_util.hpp" 00047 00052 namespace moost { namespace io { 00053 00061 class remote_watcher 00062 { 00063 public: 00064 00066 enum file_action 00067 { 00068 CREATED = 0, 00069 CHANGED = 1, 00070 DELETED = 2 00071 }; 00072 00073 typedef boost::function<void(file_action action, const std::string & url)> callback_t; 00074 00075 private: 00076 00078 std::map<std::string, callback_t > m_file_callback; 00080 00090 std::map<std::string, std::string > m_file_modified; 00091 00094 boost::mutex m_file_mutex; 00095 00097 boost::shared_ptr< boost::thread> m_pthread; 00098 00099 // m_run is thread-safe: it's mutex/conditioned to coordinate with the async watcher thread 00100 // when false, and the cond is signalled, the watcher thread will wake up and exit 00101 00103 bool m_run; 00105 00109 boost::mutex m_run_mutex; 00111 boost::condition m_run_cond; 00112 00113 int m_sleep_ms; 00114 int m_timeout_ms; 00115 00116 std::time_t get_time() 00117 { 00118 std::time_t ltime; 00119 time(<ime); 00120 return ltime; 00121 } 00122 00124 std::string last_write_time(const std::string & url) 00125 { 00126 using boost::asio::ip::tcp; 00127 tcp::iostream s; 00128 if ( !connect(s, url, m_timeout_ms) ) 00129 return ""; 00130 00131 std::string response_line; 00132 size_t pos; 00133 const std::string lastModified = "Last-Modified: "; 00134 const std::string contentLength = "Content-Length: "; 00135 std::string returned; 00136 00137 for ( int i = 0; !s.eof(); ++i ) 00138 { 00139 std::getline(s, response_line); 00140 00141 if ( s.bad() ) 00142 throw std::runtime_error("Bad stream! (Timeout?)"); 00143 00144 boost::algorithm::trim(response_line); 00145 00146 if ( response_line.empty() ) 00147 break; 00148 00149 if ( i == 0 && response_line != "HTTP/1.0 200 OK" ) 00150 return ""; // not found! 00151 00152 pos = response_line.find(lastModified); 00153 if ( pos != std::string::npos ) 00154 return response_line.substr(lastModified.size()); // ah: last modified is the best case 00155 pos = response_line.find(contentLength); 00156 if ( pos != std::string::npos ) 00157 returned = response_line.substr(contentLength.size()); 00158 } 00159 00160 if ( returned.empty() && s.good() ) 00161 { 00162 // ah nothing found. Let's parse the entire page and hash it! 00163 size_t hash = 0; 00164 for ( int i = 0; !s.eof(); ++i ) 00165 { 00166 std::getline(s, response_line); 00167 if ( s.bad() ) 00168 throw std::runtime_error("Bad stream! (Timeout?)"); 00169 boost::algorithm::trim(response_line); 00170 boost::hash_combine(hash, response_line); 00171 } 00172 00173 std::ostringstream oss; 00174 oss << hash; 00175 returned = oss.str(); 00176 } 00177 00178 return returned; 00179 } 00180 00181 public: 00182 00186 remote_watcher(int sleep_ms = 10000, int timeout_ms = 2000) : 00187 m_run(false), m_sleep_ms(sleep_ms), m_timeout_ms(timeout_ms) 00188 { 00189 } 00190 00192 00197 ~remote_watcher() 00198 { 00199 stop(); 00200 } 00201 00203 00206 void insert( const std::string & url, 00207 const callback_t & callback) 00208 { 00209 boost::mutex::scoped_lock lock(m_file_mutex); 00210 m_file_callback[url] = callback; 00211 00212 //boost::filesystem::path p(path); 00213 std::string lw = last_write_time(url); 00214 if ( !lw.empty() ) 00215 m_file_modified[url] = lw; 00216 else 00217 m_file_modified.erase(url); 00218 } 00219 00221 void clear() 00222 { 00223 this->stop(); 00224 boost::mutex::scoped_lock lock(m_file_mutex); 00225 m_file_modified.clear(); 00226 m_file_callback.clear(); 00227 } 00228 00230 00233 void erase(const std::string & url) 00234 { 00235 boost::mutex::scoped_lock lock(m_file_mutex); 00236 m_file_callback.erase(url); 00237 m_file_modified.erase(url); 00238 } 00239 00241 00245 void start() 00246 { 00247 boost::mutex::scoped_lock lock(m_run_mutex); 00248 if (m_run) 00249 return; 00250 m_run = true; 00251 m_pthread.reset(new boost::thread(boost::bind(&remote_watcher::run, this))); 00252 } 00253 00255 00258 void stop() 00259 { 00260 { 00261 boost::mutex::scoped_lock lock(m_run_mutex); 00262 if (!m_run) 00263 return; 00264 m_run = false; 00265 // Notify the asynchronous monitor thread that it should wake up 00266 // if it's currently waiting on m_run_mutex 00267 m_run_cond.notify_one(); 00268 } 00269 // can only join once we've released the mutex, so run() loop can finish up 00270 if ( m_pthread ) 00271 m_pthread->join(); 00272 } 00273 00275 00278 static bool connect( boost::asio::ip::tcp::iostream& s, std::string url, 00279 int timeout_ms, bool skip_header = false ) 00280 { 00281 std::string protocol = "http"; 00282 std::string protocolUrl = protocol + "://"; 00283 00284 std::string host; 00285 std::string page = "/"; 00286 00287 if ( url.size() > protocolUrl.size() && 00288 url.substr(0, protocolUrl.size()) == protocolUrl ) 00289 { 00290 url = url.substr(protocolUrl.size()); 00291 } 00292 00293 size_t posCol = url.find(':'); 00294 size_t posPage = url.find('/'); 00295 if ( posCol != std::string::npos && posCol != 0 && ++posCol < url.size()) 00296 { 00297 host = url.substr(0, posCol-1); 00298 if ( posPage == std::string::npos ) 00299 { 00300 protocol = url.substr(posCol); // i.e. http://www.myHost.com:8080 00301 } 00302 else 00303 { 00304 protocol = url.substr(posCol, posPage-(posCol)); // i.e. http://www.myHost.com:8080/page 00305 page = url.substr(posPage); 00306 } 00307 } 00308 else if ( posPage != std::string::npos && posPage != 0) 00309 { 00310 host = url.substr(0, posPage); // i.e. http://www.myHost.com/page 00311 page = url.substr(posPage); 00312 } 00313 else 00314 host = url; // i.e. http://www.myHost.com 00315 00316 s.connect(host, protocol); 00317 if ( !s ) 00318 return false; 00319 00320 struct timeval r = { (int)(timeout_ms/1000), 00321 (int)((timeout_ms%1000)*1000) }; 00322 00323 // native! 00324 setsockopt(s.rdbuf()->native(), SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&r), sizeof(r)); 00325 setsockopt(s.rdbuf()->native(), SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&r), sizeof(r)); 00326 00327 s << "GET " << page << " HTTP/1.0\r\n" 00328 << "Host: " << host << "\r\n" 00329 << "\r\n" 00330 << std::flush; 00331 00332 if ( !s ) 00333 return false; 00334 00335 if ( skip_header ) 00336 { 00337 std::string response_line; 00338 for ( int i = 0; !s.eof(); ++i ) 00339 { 00340 std::getline(s, response_line); 00341 boost::algorithm::trim(response_line); 00342 if ( response_line.empty() ) 00343 break; 00344 } 00345 } 00346 00347 return true; 00348 } 00349 00350 private: 00351 00353 void run() 00354 { 00355 bool run = true; 00356 typedef std::pair< callback_t , std::pair< file_action, std::string> > notification; 00357 std::vector< notification> notifications; 00358 00359 for (;;) 00360 { 00361 { 00362 // Lock and check m_run; if it was set to false, we must return ASAP 00363 boost::mutex::scoped_lock lock(m_run_mutex); 00364 run = m_run; 00365 if (!run) 00366 return; 00367 // We release the lock, block the thread until one second 00368 m_run_cond.timed_wait(lock, moost::thread::xtime_util::add_ms(moost::thread::xtime_util::now(), m_sleep_ms)); 00369 } 00370 // If we are not running (e.g. when the destructor woke us up), return 00371 if (!run) 00372 return; 00373 00374 // Clear the notifications vector where we will collect the events 00375 // that will be fired 00376 notifications.clear(); 00377 { 00378 // Lock m_file_mutex while we are working on m_file_callback 00379 boost::mutex::scoped_lock lock(m_file_mutex); 00380 for (std::map<std::string, callback_t>::iterator it = m_file_callback.begin(); it != m_file_callback.end(); ++it) 00381 { 00382 // Does the path exist? 00383 std::string lw = last_write_time(it->first); 00384 if ( !lw.empty() ) 00385 { 00386 // Check its last modification time and compare it with what we had earlier 00387 std::map< std::string, std::string >::iterator it_mod = m_file_modified.find(it->first); 00388 00389 if (it_mod == m_file_modified.end()) 00390 { 00391 // We haven't seen this file so far, so insert it into the 00392 // map and add a creation event that will be fired 00393 m_file_modified[it->first] = lw; 00394 notifications.push_back(std::make_pair(it->second, std::make_pair(CREATED, it->first))); 00395 } 00396 else 00397 { 00398 if (lw != it_mod->second) 00399 notifications.push_back(std::make_pair(it->second, std::make_pair(CHANGED, it->first))); 00400 it_mod->second = lw; 00401 } 00402 } 00403 else 00404 { 00405 // The path does not exist. Did we have it before? If so, fire 00406 // a deletion event. 00407 std::map< std::string, std::string >::iterator it_mod = m_file_modified.find(it->first); 00408 if (it_mod != m_file_modified.end()) 00409 { 00410 m_file_modified.erase(it_mod); 00411 notifications.push_back(std::make_pair(it->second, std::make_pair(DELETED, it->first))); 00412 } 00413 } 00414 } 00415 } 00416 00417 // okay! we've released our lock on m_file_callback and m_file_modified 00418 // so it's time to send off our notifications 00419 for (std::vector<notification>::iterator it = notifications.begin(); it != notifications.end(); ++it) 00420 { 00421 try 00422 { 00423 it->first(it->second.first, it->second.second); 00424 } 00425 catch (...) 00426 { 00427 // \todo can we do better here than silently ignoring the exception? 00428 } 00429 } 00430 } 00431 } 00432 }; 00433 00434 }} // moost::io 00435 00436 #endif // MOOST_IO_REMOTE_WATCHER_HPP__