libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 // This is worth reading if you are new to inotify 00029 // http://www.linuxjournal.com/article/8478?page=0,0 00030 00031 #ifndef MOOST_IO_NOTIFY_LINUX_HPP__ 00032 #define MOOST_IO_NOTIFY_LINUX_HPP__ 00033 00034 #include <boost/function.hpp> 00035 #include <boost/shared_ptr.hpp> 00036 #include <boost/thread.hpp> 00037 #include <boost/thread/shared_mutex.hpp> 00038 #include <boost/thread/mutex.hpp> 00039 #include <boost/thread/locks.hpp> 00040 #include <boost/bind.hpp> 00041 #include <boost/asio.hpp> 00042 00043 #include <sys/inotify.h> 00044 00045 #include <string> 00046 #include <vector> 00047 #include <map> 00048 00049 #include <iostream> 00050 #include <stdio.h> 00051 #include <poll.h> 00052 00053 namespace moost { namespace io { 00054 00055 class ionotify 00056 { 00057 public: 00058 ionotify( 00059 bool stop_immediately = false // Terminate as soon as the event processor quits, don't flush handlers 00060 ): 00061 fd_(inotify_init()), 00062 stop_immediately_(stop_immediately) 00063 { 00064 if(fd_ < 0) 00065 { 00066 throw std::runtime_error("failed to initialise inotify"); 00067 } 00068 } 00069 00070 ~ionotify() 00071 { 00072 try 00073 { 00074 stop(); 00075 close(fd_); // And we're done 00076 } 00077 catch(...) 00078 { 00079 // swallowed. 00080 } 00081 } 00082 00083 enum file_action 00084 { 00085 CREATED = 0, 00086 CHANGED = 1, 00087 DELETED = 2 00088 }; 00089 00090 typedef boost::function<void(file_action fa, const std::string & path)> callback_t; 00091 00092 private: 00093 typedef std::pair<callback_t, std::string> event_t; 00094 typedef std::map<int, event_t> event_map_t; 00095 typedef std::map<std::string, int> path_map_t; 00096 typedef boost::shared_ptr<boost::thread> pthread_t; 00097 00098 static const int EVENT_MASK = IN_ATTRIB | IN_DELETE | IN_DELETE_SELF | IN_MODIFY | IN_MOVE_SELF | IN_MOVED_FROM; 00099 00100 public: 00101 void insert(std::string const & path, callback_t const & callback, bool call_now = false) 00102 { 00103 boost::unique_lock<boost::shared_mutex> ul(smtx_event_); 00104 00105 // Add a new watch to the event notifier 00106 00107 int wd = inotify_add_watch( 00108 fd_, path.c_str(), 00109 EVENT_MASK 00110 ); 00111 00112 if(wd < 0) 00113 { 00114 throw std::runtime_error("failed to add watch"); 00115 } 00116 00117 path_map_[path] = wd; 00118 event_map_[wd] = event_t(callback, path); 00119 00120 // If we've been asked to fire now, well maybe we should :) 00121 if(call_now) { callback(CHANGED, path); } 00122 } 00123 00124 void erase(std::string const & path) 00125 { 00126 boost::unique_lock<boost::shared_mutex> ul(smtx_event_); 00127 00128 // Find an event and if we do remove the watch 00129 path_map_t::iterator itr = path_map_.find(path); 00130 00131 if(itr != path_map_.end()) 00132 { 00133 inotify_rm_watch(itr->second, EVENT_MASK); // If this fails it fails... not much we can do really! 00134 event_map_.erase(itr->second); 00135 path_map_.erase(itr); 00136 } 00137 } 00138 00139 void start() 00140 { 00141 boost::unique_lock<boost::mutex> ul(mtx_stop_start_); 00142 00143 // Start event handler queue processor 00144 if(!pthread_handler) 00145 { 00146 // we only have one thread because we want events to be processed in order, we're 00147 // using asio to do this so we don't block the event processng queue whilst we wait 00148 // for a slow user callback to finish. 00149 spwork_.reset(new boost::asio::io_service::work(ioservice_)); 00150 pthread_handler.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &ioservice_))); 00151 } 00152 00153 // start the event processer 00154 if(!pthread_event) 00155 { 00156 pthread_event.reset(new boost::thread(boost::bind(&ionotify::thread_proc, this))); 00157 } 00158 } 00159 00160 void stop() 00161 { 00162 boost::unique_lock<boost::mutex> ul(mtx_stop_start_); 00163 00164 // Stop event handler queue processor 00165 if(pthread_event) 00166 { 00167 // Wait for the event processor to be done 00168 pthread_event->interrupt(); 00169 pthread_event->join(); 00170 pthread_event.reset(); 00171 } 00172 00173 // Stop the event processer 00174 if(pthread_handler) 00175 { 00176 // If we're not interested in flushing the event queue stop now! 00177 if(stop_immediately_) 00178 { 00179 ioservice_.stop(); // Stop!!!!! 00180 } 00181 00182 // Wait for the event queue to be done 00183 spwork_.reset(); 00184 pthread_handler->join(); 00185 } 00186 } 00187 00188 private: 00189 00190 bool is_file_action(int mask, int eventid) 00191 { 00192 // I faculitate mask matching :) 00193 return ((mask & eventid) == eventid); 00194 } 00195 00196 bool get_file_action(int mask, file_action & fa) 00197 { 00198 // Here we're mapping inotify masl values to our file action enum 00199 if(is_file_action(mask, IN_DELETE)) { fa = DELETED; return true; } 00200 if(is_file_action(mask, IN_DELETE_SELF)) { fa = DELETED; return true; } 00201 if(is_file_action(mask, IN_MODIFY)) { fa = CHANGED; return true; } 00202 if(is_file_action(mask, IN_MOVE_SELF)) { fa = DELETED; return true; } 00203 if(is_file_action(mask, IN_MOVED_FROM)) { fa = DELETED; return true; } 00204 00205 return false; 00206 } 00207 00208 void thread_proc() 00209 { 00210 size_t const EVENT_SIZE = ( sizeof (inotify_event) ); 00211 size_t const BUF_LEN = ( 1024 * ( EVENT_SIZE + 16 ) ); 00212 std::vector<char> vbuf(BUF_LEN); 00213 00214 struct pollfd pfd; 00215 int ret; 00216 00217 pfd.fd = fd_; 00218 pfd.events = POLLIN; 00219 00220 for(;;) 00221 { 00222 boost::this_thread::interruption_point(); // Thread interupt point 00223 00224 // Every second poll() will timeout and give us a chance to interrupt 00225 pfd.revents = 0; 00226 ret = ::poll(&pfd, 1, 1000); 00227 00228 boost::this_thread::interruption_point(); // Thread interupt point 00229 00230 if (ret < 0) 00231 { 00232 // error - um, whoops! Ignored. 00233 } 00234 else if (!ret) 00235 { 00236 // timed out - we don't care. Ignored. 00237 } 00238 else if (pfd.revents & POLLIN) 00239 { 00240 // signaled - let's start reading the event sequence 00241 int elen = 0; 00242 int len = read (fd_, &vbuf[0], vbuf.size()); 00243 00244 if (len < 0) 00245 { 00246 // Hmmm... the read returned an error state 00247 00248 if (errno == EINTR) 00249 { 00250 continue; // More is coming... go fetch 00251 } 00252 else 00253 { 00254 // Read error. Ignored. Let's try and process what we can. 00255 } 00256 } 00257 else if (!len) 00258 { 00259 // buffer is to small, let's increase it by BUF_LEN - at most allow BUF_LEN * 10 00260 00261 if(vbuf.size() < (BUF_LEN * 10)) 00262 { 00263 vbuf.resize(vbuf.size() + BUF_LEN); 00264 } 00265 else 00266 { 00267 // Ok, we have a bit of a problem... this really should never happen so ignored! 00268 } 00269 } 00270 00271 // Whilst there are events to process... 00272 while (len && elen < len) 00273 { 00274 boost::this_thread::interruption_point(); // Therad interupt point 00275 00276 // Bytes to event object please 00277 inotify_event *this_event = (inotify_event *) &vbuf[elen]; 00278 elen += EVENT_SIZE + this_event->len; 00279 00280 boost::shared_lock<boost::shared_mutex> sl(smtx_event_); 00281 event_map_t::iterator itrEvent = event_map_.find(this_event->wd); 00282 00283 // These aren't the droids you're looking for - but is it an event we care about? 00284 if(itrEvent != event_map_.end()) 00285 { 00286 try 00287 { 00288 // Great, get the corresponding file action and hand the event to asio to fire 00289 file_action fa; 00290 if(get_file_action(this_event->mask, fa)) 00291 { 00292 // new events add a handler to the event hander queue, which is processed by asio 00293 ioservice_.post( 00294 boost::bind( 00295 itrEvent->second.first, fa, itrEvent->second.second 00296 ) 00297 ); 00298 } 00299 } 00300 catch(...) 00301 { 00302 // swallow exceptions, we don't want our thread to die due to callback error 00303 } 00304 } 00305 } 00306 } 00307 } 00308 } 00309 00310 private: 00311 int fd_; 00312 event_map_t event_map_; 00313 path_map_t path_map_; 00314 pthread_t pthread_event; 00315 boost::shared_mutex smtx_event_; 00316 boost::mutex mtx_stop_start_; 00317 boost::asio::io_service ioservice_; 00318 boost::shared_ptr<boost::asio::io_service::work> spwork_; 00319 pthread_t pthread_handler; 00320 bool stop_immediately_; 00321 }; 00322 00323 }} // moost::io 00324 00325 #endif // MOOST_IO_NOTIFY_LINUX_HPP__