libmoost
/home/mhx/git/github/libmoost/include/moost/io/remote_watcher.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_REMOTE_WATCHER_HPP__
00029 #define MOOST_REMOTE_WATCHER_HPP__
00030 
00031 #include <map>
00032 #include <string>
00033 #include <sstream>
00034 #include <limits>
00035 #include <functional>
00036 
00037 #include <boost/asio.hpp>
00038 
00039 #include <boost/thread.hpp>
00040 #include <boost/shared_ptr.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/function.hpp>
00043 #include <boost/algorithm/string/trim.hpp>
00044 #include <boost/functional/hash.hpp>
00045 
00046 #include "../thread/xtime_util.hpp"
00047 
00052 namespace moost { namespace io {
00053 
00061 class remote_watcher
00062 {
00063 public:
00064 
00066   enum file_action
00067   {
00068      CREATED = 0, 
00069      CHANGED = 1, 
00070      DELETED = 2
00071   };
00072 
00073   typedef boost::function<void(file_action action, const std::string & url)> callback_t;
00074 
00075 private:
00076 
00078   std::map<std::string, callback_t > m_file_callback;
00080 
00090   std::map<std::string, std::string > m_file_modified;
00091 
00094   boost::mutex m_file_mutex;
00095 
00097   boost::shared_ptr< boost::thread> m_pthread;
00098 
00099   // m_run is thread-safe: it's mutex/conditioned to coordinate with the async watcher thread
00100   // when false, and the cond is signalled, the watcher thread will wake up and exit
00101 
00103   bool m_run;
00105 
00109   boost::mutex m_run_mutex;
00111   boost::condition m_run_cond;
00112 
00113   int m_sleep_ms;
00114   int m_timeout_ms;
00115 
00116   std::time_t get_time()
00117   {
00118      std::time_t ltime;
00119      time(&ltime);
00120      return ltime;
00121   }
00122 
00124   std::string last_write_time(const std::string & url)
00125   {
00126      using boost::asio::ip::tcp;
00127      tcp::iostream s;
00128      if ( !connect(s, url, m_timeout_ms) )
00129         return "";
00130 
00131      std::string response_line;
00132      size_t pos;
00133      const std::string lastModified = "Last-Modified: ";
00134      const std::string contentLength = "Content-Length: ";
00135      std::string returned;
00136 
00137      for ( int i = 0; !s.eof(); ++i )
00138      {
00139         std::getline(s, response_line);
00140 
00141         if ( s.bad() )
00142           throw std::runtime_error("Bad stream! (Timeout?)");
00143 
00144         boost::algorithm::trim(response_line);
00145 
00146         if ( response_line.empty() )
00147           break;
00148 
00149         if ( i == 0 && response_line != "HTTP/1.0 200 OK" )
00150           return ""; // not found!
00151 
00152         pos = response_line.find(lastModified);
00153         if ( pos != std::string::npos )
00154           return response_line.substr(lastModified.size()); // ah: last modified is the best case
00155         pos = response_line.find(contentLength);
00156         if ( pos != std::string::npos )
00157           returned = response_line.substr(contentLength.size());
00158      }
00159 
00160      if ( returned.empty() && s.good() )
00161      {
00162         // ah nothing found. Let's parse the entire page and hash it!
00163         size_t hash = 0;
00164         for ( int i = 0; !s.eof(); ++i )
00165         {
00166            std::getline(s, response_line);
00167            if ( s.bad() )
00168               throw std::runtime_error("Bad stream! (Timeout?)");
00169            boost::algorithm::trim(response_line);
00170            boost::hash_combine(hash, response_line);
00171         }
00172 
00173         std::ostringstream oss;
00174         oss << hash;
00175         returned = oss.str();
00176      }
00177 
00178      return returned;
00179   }
00180 
00181 public:
00182 
00186   remote_watcher(int sleep_ms = 10000, int timeout_ms = 2000) :
00187     m_run(false), m_sleep_ms(sleep_ms), m_timeout_ms(timeout_ms)
00188   {
00189   }
00190 
00192 
00197   ~remote_watcher()
00198   {
00199     stop();
00200   }
00201 
00203 
00206   void insert( const std::string & url,
00207                const callback_t & callback)
00208   {
00209     boost::mutex::scoped_lock lock(m_file_mutex);
00210     m_file_callback[url] = callback;
00211 
00212     //boost::filesystem::path p(path);
00213     std::string lw = last_write_time(url);
00214     if ( !lw.empty() )
00215       m_file_modified[url] = lw;
00216     else
00217       m_file_modified.erase(url);
00218   }
00219 
00221   void clear()
00222   {
00223      this->stop();
00224      boost::mutex::scoped_lock lock(m_file_mutex);
00225      m_file_modified.clear();
00226      m_file_callback.clear();
00227   }
00228 
00230 
00233   void erase(const std::string & url)
00234   {
00235     boost::mutex::scoped_lock lock(m_file_mutex);
00236     m_file_callback.erase(url);
00237     m_file_modified.erase(url);
00238   }
00239 
00241 
00245   void start()
00246   {
00247     boost::mutex::scoped_lock lock(m_run_mutex);
00248     if (m_run)
00249       return;
00250     m_run = true;
00251     m_pthread.reset(new boost::thread(boost::bind(&remote_watcher::run, this)));
00252   }
00253 
00255 
00258   void stop()
00259   {
00260     {
00261       boost::mutex::scoped_lock lock(m_run_mutex);
00262       if (!m_run)
00263         return;
00264       m_run = false;
00265       // Notify the asynchronous monitor thread that it should wake up
00266       // if it's currently waiting on m_run_mutex
00267       m_run_cond.notify_one();
00268     }
00269     // can only join once we've released the mutex, so run() loop can finish up
00270     if ( m_pthread )
00271       m_pthread->join();
00272   }
00273 
00275 
00278   static bool connect( boost::asio::ip::tcp::iostream& s, std::string url,
00279                        int timeout_ms,  bool skip_header = false )
00280   {
00281      std::string protocol = "http";
00282      std::string protocolUrl = protocol + "://";
00283 
00284      std::string host;
00285      std::string page = "/";
00286 
00287      if ( url.size() > protocolUrl.size() &&
00288           url.substr(0, protocolUrl.size()) == protocolUrl )
00289      {
00290         url = url.substr(protocolUrl.size());
00291      }
00292 
00293      size_t posCol = url.find(':');
00294      size_t posPage = url.find('/');
00295      if ( posCol != std::string::npos && posCol != 0 && ++posCol < url.size())
00296      {
00297         host = url.substr(0, posCol-1);
00298         if ( posPage == std::string::npos )
00299         {
00300            protocol = url.substr(posCol); // i.e. http://www.myHost.com:8080
00301         }
00302         else
00303         {
00304            protocol = url.substr(posCol, posPage-(posCol)); // i.e. http://www.myHost.com:8080/page
00305            page = url.substr(posPage);
00306         }
00307      }
00308      else if ( posPage != std::string::npos && posPage != 0)
00309      {
00310         host = url.substr(0, posPage); // i.e. http://www.myHost.com/page
00311         page = url.substr(posPage);
00312      }
00313      else
00314         host = url; // i.e. http://www.myHost.com
00315 
00316      s.connect(host, protocol);
00317      if ( !s )
00318         return false;
00319 
00320      struct timeval r = { (int)(timeout_ms/1000),
00321                           (int)((timeout_ms%1000)*1000) };
00322 
00323      // native!
00324      setsockopt(s.rdbuf()->native(), SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&r), sizeof(r));
00325      setsockopt(s.rdbuf()->native(), SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&r), sizeof(r));
00326 
00327      s << "GET " << page << " HTTP/1.0\r\n"
00328        << "Host: " << host << "\r\n"
00329        << "\r\n"
00330        << std::flush;
00331 
00332      if ( !s )
00333         return false;
00334 
00335      if ( skip_header )
00336      {
00337         std::string response_line;
00338         for ( int i = 0; !s.eof(); ++i )
00339         {
00340            std::getline(s, response_line);
00341            boost::algorithm::trim(response_line);
00342            if ( response_line.empty() )
00343               break;
00344         }
00345      }
00346 
00347      return true;
00348   }
00349 
00350 private:
00351 
00353   void run()
00354   {
00355     bool run = true;
00356     typedef std::pair< callback_t , std::pair< file_action, std::string> > notification;
00357     std::vector< notification> notifications;
00358 
00359     for (;;)
00360     {
00361       {
00362         // Lock and check m_run; if it was set to false, we must return ASAP
00363         boost::mutex::scoped_lock lock(m_run_mutex);
00364         run = m_run;
00365         if (!run)
00366           return;
00367         // We release the lock, block the thread until one second
00368         m_run_cond.timed_wait(lock, moost::thread::xtime_util::add_ms(moost::thread::xtime_util::now(), m_sleep_ms));
00369       }
00370       // If we are not running (e.g. when the destructor woke us up), return
00371       if (!run)
00372         return;
00373 
00374       // Clear the notifications vector where we will collect the events
00375       // that will be fired
00376       notifications.clear();
00377       {
00378         // Lock m_file_mutex while we are working on m_file_callback
00379         boost::mutex::scoped_lock lock(m_file_mutex);
00380         for (std::map<std::string, callback_t>::iterator it = m_file_callback.begin(); it != m_file_callback.end(); ++it)
00381         {
00382           // Does the path exist?
00383           std::string lw = last_write_time(it->first);
00384           if ( !lw.empty() )
00385           {
00386             // Check its last modification time and compare it with what we had earlier
00387             std::map< std::string, std::string >::iterator it_mod = m_file_modified.find(it->first);
00388 
00389             if (it_mod == m_file_modified.end())
00390             {
00391               // We haven't seen this file so far, so insert it into the
00392               // map and add a creation event that will be fired
00393               m_file_modified[it->first] = lw;
00394               notifications.push_back(std::make_pair(it->second, std::make_pair(CREATED, it->first)));
00395             }
00396             else
00397             {
00398               if (lw != it_mod->second)
00399                 notifications.push_back(std::make_pair(it->second, std::make_pair(CHANGED, it->first)));
00400               it_mod->second = lw;
00401             }
00402           }
00403           else
00404           {
00405             // The path does not exist. Did we have it before? If so, fire
00406             // a deletion event.
00407             std::map< std::string, std::string >::iterator it_mod = m_file_modified.find(it->first);
00408             if (it_mod != m_file_modified.end())
00409             {
00410               m_file_modified.erase(it_mod);
00411               notifications.push_back(std::make_pair(it->second, std::make_pair(DELETED, it->first)));
00412             }
00413           }
00414         }
00415       }
00416 
00417       // okay!  we've released our lock on m_file_callback and m_file_modified
00418       // so it's time to send off our notifications
00419       for (std::vector<notification>::iterator it = notifications.begin(); it != notifications.end(); ++it)
00420       {
00421         try
00422         {
00423           it->first(it->second.first, it->second.second);
00424         }
00425         catch (...)
00426         {
00427           // \todo  can we do better here than silently ignoring the exception?
00428         }
00429       }
00430     }
00431   }
00432 };
00433 
00434 }} // moost::io
00435 
00436 #endif // MOOST_IO_REMOTE_WATCHER_HPP__