libmoost
/home/mhx/git/github/libmoost/include/moost/io/detail/ionotify_linux.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 // This is worth reading if you are new to inotify
00029 // http://www.linuxjournal.com/article/8478?page=0,0
00030 
00031 #ifndef MOOST_IO_NOTIFY_LINUX_HPP__
00032 #define MOOST_IO_NOTIFY_LINUX_HPP__
00033 
00034 #include <boost/function.hpp>
00035 #include <boost/shared_ptr.hpp>
00036 #include <boost/thread.hpp>
00037 #include <boost/thread/shared_mutex.hpp>
00038 #include <boost/thread/mutex.hpp>
00039 #include <boost/thread/locks.hpp>
00040 #include <boost/bind.hpp>
00041 #include <boost/asio.hpp>
00042 
00043 #include <sys/inotify.h>
00044 
00045 #include <string>
00046 #include <vector>
00047 #include <map>
00048 
00049 #include <iostream>
00050 #include <stdio.h>
00051 #include <poll.h>
00052 
00053 namespace moost { namespace io {
00054 
00055    class ionotify
00056    {
00057    public:
00058       ionotify(
00059          bool stop_immediately = false   // Terminate as soon as the event processor quits, don't flush handlers
00060          ):
00061             fd_(inotify_init()),
00062             stop_immediately_(stop_immediately)
00063       {
00064          if(fd_ < 0)
00065          {
00066             throw std::runtime_error("failed to initialise inotify");
00067          }
00068       }
00069 
00070       ~ionotify()
00071       {
00072          try
00073          {
00074             stop();
00075             close(fd_); // And we're done
00076          }
00077          catch(...)
00078          {
00079             // swallowed.
00080          }
00081       }
00082 
00083       enum file_action
00084       {
00085          CREATED = 0, 
00086          CHANGED = 1, 
00087          DELETED = 2  
00088       };
00089 
00090       typedef boost::function<void(file_action fa, const std::string & path)> callback_t;
00091 
00092    private:
00093       typedef std::pair<callback_t, std::string> event_t;
00094       typedef std::map<int,  event_t> event_map_t;
00095       typedef std::map<std::string, int> path_map_t;
00096       typedef boost::shared_ptr<boost::thread> pthread_t;
00097 
00098       static const int EVENT_MASK = IN_ATTRIB | IN_DELETE | IN_DELETE_SELF | IN_MODIFY | IN_MOVE_SELF | IN_MOVED_FROM;
00099 
00100    public:
00101       void insert(std::string const & path, callback_t const & callback, bool call_now = false)
00102       {
00103          boost::unique_lock<boost::shared_mutex> ul(smtx_event_);
00104 
00105          // Add a new watch to the event notifier
00106 
00107          int wd = inotify_add_watch(
00108             fd_, path.c_str(),
00109             EVENT_MASK
00110             );
00111 
00112          if(wd < 0)
00113          {
00114             throw std::runtime_error("failed to add watch");
00115          }
00116 
00117          path_map_[path] = wd;
00118          event_map_[wd] = event_t(callback, path);
00119 
00120          // If we've been asked to fire now, well maybe we should :)
00121          if(call_now) { callback(CHANGED, path); }
00122       }
00123 
00124       void erase(std::string const & path)
00125       {
00126          boost::unique_lock<boost::shared_mutex> ul(smtx_event_);
00127 
00128          // Find an event and if we do remove the watch
00129          path_map_t::iterator itr  = path_map_.find(path);
00130 
00131          if(itr != path_map_.end())
00132          {
00133             inotify_rm_watch(itr->second, EVENT_MASK); // If this fails it fails... not much we can do really!
00134             event_map_.erase(itr->second);
00135             path_map_.erase(itr);
00136          }
00137       }
00138 
00139       void start()
00140       {
00141          boost::unique_lock<boost::mutex> ul(mtx_stop_start_);
00142 
00143          // Start event handler queue processor
00144          if(!pthread_handler)
00145          {
00146             // we only have one thread because we want events to be processed in order, we're
00147             // using asio to do this so we don't block the event processng queue whilst we wait
00148             // for a slow user callback to finish.
00149             spwork_.reset(new boost::asio::io_service::work(ioservice_));
00150             pthread_handler.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &ioservice_)));
00151          }
00152 
00153          // start the event processer
00154          if(!pthread_event)
00155          {
00156             pthread_event.reset(new boost::thread(boost::bind(&ionotify::thread_proc, this)));
00157          }
00158       }
00159 
00160       void stop()
00161       {
00162          boost::unique_lock<boost::mutex> ul(mtx_stop_start_);
00163 
00164          // Stop event handler queue processor
00165          if(pthread_event)
00166          {
00167             // Wait for the event processor to be done
00168             pthread_event->interrupt();
00169             pthread_event->join();
00170             pthread_event.reset();
00171          }
00172 
00173          // Stop the event processer
00174          if(pthread_handler)
00175          {
00176             // If we're not interested in flushing the event queue stop now!
00177             if(stop_immediately_)
00178             {
00179                ioservice_.stop(); // Stop!!!!!
00180             }
00181 
00182             // Wait for the event queue to be done
00183             spwork_.reset();
00184             pthread_handler->join();
00185          }
00186       }
00187 
00188    private:
00189 
00190       bool is_file_action(int mask, int eventid)
00191       {
00192          // I faculitate mask matching :)
00193          return ((mask & eventid) == eventid);
00194       }
00195 
00196       bool get_file_action(int mask, file_action & fa)
00197       {
00198          // Here we're mapping inotify masl values to our file action enum
00199          if(is_file_action(mask, IN_DELETE)) { fa = DELETED; return true; }
00200          if(is_file_action(mask, IN_DELETE_SELF)) { fa = DELETED; return true; }
00201          if(is_file_action(mask, IN_MODIFY)) { fa = CHANGED; return true; }
00202          if(is_file_action(mask, IN_MOVE_SELF)) { fa = DELETED; return true; }
00203          if(is_file_action(mask, IN_MOVED_FROM)) { fa = DELETED; return true; }
00204 
00205          return false;
00206       }
00207 
00208       void thread_proc()
00209       {
00210          size_t const EVENT_SIZE = ( sizeof (inotify_event) );
00211          size_t const BUF_LEN    = ( 1024 * ( EVENT_SIZE + 16 ) );
00212          std::vector<char> vbuf(BUF_LEN);
00213 
00214          struct pollfd pfd;
00215          int ret;
00216 
00217          pfd.fd = fd_;
00218          pfd.events = POLLIN;
00219 
00220          for(;;)
00221          {
00222             boost::this_thread::interruption_point(); // Thread interupt point
00223 
00224             // Every second poll() will timeout and give us a chance to interrupt
00225             pfd.revents = 0;
00226             ret = ::poll(&pfd, 1, 1000);
00227 
00228             boost::this_thread::interruption_point(); // Thread interupt point
00229 
00230             if (ret < 0)
00231             {
00232                // error - um, whoops! Ignored.
00233             }
00234             else if (!ret)
00235             {
00236                // timed out - we don't care. Ignored.
00237             }
00238             else if (pfd.revents & POLLIN)
00239             {
00240                // signaled - let's start reading the event sequence
00241                int elen = 0;
00242                int len = read (fd_, &vbuf[0], vbuf.size());
00243 
00244                if (len < 0)
00245                {
00246                   // Hmmm... the read returned an error state
00247 
00248                   if (errno == EINTR)
00249                   {
00250                      continue; // More is coming... go fetch
00251                   }
00252                   else
00253                   {
00254                      // Read error.  Ignored. Let's try and process what we can.
00255                   }
00256                }
00257                else if (!len)
00258                {
00259                   // buffer is to small, let's increase it by BUF_LEN - at most allow BUF_LEN * 10
00260 
00261                   if(vbuf.size() < (BUF_LEN * 10))
00262                   {
00263                      vbuf.resize(vbuf.size() + BUF_LEN);
00264                   }
00265                   else
00266                   {
00267                      // Ok, we have a bit of a problem... this really should never happen so ignored!
00268                   }
00269                }
00270 
00271                // Whilst there are events to process...
00272                while (len && elen < len)
00273                {
00274                   boost::this_thread::interruption_point(); // Therad interupt point
00275 
00276                   // Bytes to event object please
00277                   inotify_event *this_event = (inotify_event *) &vbuf[elen];
00278                   elen += EVENT_SIZE + this_event->len;
00279 
00280                   boost::shared_lock<boost::shared_mutex> sl(smtx_event_);
00281                   event_map_t::iterator itrEvent = event_map_.find(this_event->wd);
00282 
00283                   // These aren't the droids you're looking for - but is it an event we care about?
00284                   if(itrEvent != event_map_.end())
00285                   {
00286                      try
00287                      {
00288                         // Great, get the corresponding file action and hand the event to asio to fire
00289                         file_action fa;
00290                         if(get_file_action(this_event->mask, fa))
00291                         {
00292                            // new events add a handler to the event hander queue, which is processed by asio
00293                            ioservice_.post(
00294                               boost::bind(
00295                                     itrEvent->second.first, fa, itrEvent->second.second
00296                                  )
00297                               );
00298                         }
00299                      }
00300                      catch(...)
00301                      {
00302                         // swallow exceptions, we don't want our thread to die due to callback error
00303                      }
00304                   }
00305                }
00306             }
00307          }
00308       }
00309 
00310    private:
00311       int fd_;
00312       event_map_t event_map_;
00313       path_map_t path_map_;
00314       pthread_t pthread_event;
00315       boost::shared_mutex smtx_event_;
00316       boost::mutex mtx_stop_start_;
00317       boost::asio::io_service ioservice_;
00318       boost::shared_ptr<boost::asio::io_service::work> spwork_;
00319       pthread_t pthread_handler;
00320       bool stop_immediately_;
00321    };
00322 
00323 }} // moost::io
00324 
00325 #endif // MOOST_IO_NOTIFY_LINUX_HPP__