libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_TRANSACTION_QUEUE_HPP__ 00029 #define MOOST_TRANSACTION_QUEUE_HPP__ 00030 00031 // A collection of queue types for use with moost/transaction/handler.hpp 00032 00033 #include <fstream> 00034 #include <sstream> 00035 #include <string> 00036 #include <stdexcept> 00037 #include <iomanip> 00038 00039 #include <boost/filesystem.hpp> 00040 #include <boost/lexical_cast.hpp> 00041 #include <boost/regex.hpp> 00042 00043 namespace moost { namespace transaction { 00044 00045 00046 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00047 // Use this class as the base for your specialized serializer 00048 class SerializerBase 00049 { 00050 public: 00051 void Purge(std::string const & key) const 00052 { 00053 boost::filesystem::remove(key); 00054 } 00055 00056 protected: 00057 // Only my sub-class can create and destroy me :) 00058 SerializerBase(){} 00059 ~SerializerBase(){} 00060 }; 00061 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00062 // This is a generic serialiser for POD types ONLY. If you data has 00063 // specific serialisation requirements you need to implement you own 00064 // serialization functor. 00065 00066 template < 00067 typename dataT, 00068 typename istreamT = std::ifstream, 00069 typename ostreamT = std::ofstream 00070 > 00071 class Serializer : public SerializerBase 00072 { 00073 public: 00074 bool Serialise(std::string const & key, dataT const & data) const 00075 { 00076 ostreamT oStream(key.c_str(), std::ios::binary | std::ios::trunc); 00077 bool bOk = oStream.is_open(); 00078 00079 if(bOk) 00080 { 00081 oStream.write(reinterpret_cast<char const *>(&data), sizeof(data)); 00082 bOk = oStream.good(); 00083 oStream.close(); 00084 } 00085 00086 return bOk; 00087 } 00088 00089 bool Deserialise(std::string const & key, dataT & data) const 00090 { 00091 istreamT iStream(key.c_str(), std::ios::binary); 00092 bool bOk = iStream.is_open(); 00093 00094 if(bOk) 00095 { 00096 iStream.read(reinterpret_cast<char *>(&data), sizeof(data)); 00097 bOk = iStream.good(); 00098 iStream.close(); 00099 } 00100 00101 return bOk; 00102 } 00103 }; 00104 00105 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00106 // This is the generic interface for transaction queues 00107 00108 template < 00109 typename dataT 00110 > 00111 class ITransactionQueue 00112 { 00113 public: 00114 typedef dataT value_type; 00115 00116 // The interface is polymorphic so any sub-classes can be interchanged dynamically at runtime 00117 virtual ~ITransactionQueue(){} 00118 00119 virtual size_t size() const = 0; 00120 virtual bool empty() const = 0; 00121 virtual value_type & front() = 0; 00122 virtual void push_back(value_type const & data) = 0; 00123 virtual void pop_front() = 0; 00124 }; 00125 00126 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00127 // A non-persisted queue, that uses the ITransactionQueue interface. This 00128 // is provided to allow none, partial and full parsistance to be swapped 00129 // dynamically at runtime depending upon settings 00130 00131 template < 00132 typename dataT, 00133 template<typename T, typename A = std::allocator<T> > class queueT = std::deque 00134 > 00135 class NonePersistedTQ : public ITransactionQueue<dataT> 00136 { 00137 public: 00138 typedef dataT value_type; 00139 00140 size_t size() const { return m_queue.size(); } 00141 bool empty() const { return m_queue.empty(); } 00142 value_type & front() { return m_queue.front(); } 00143 void push_back(value_type const & data) { m_queue.push_back(data); }; 00144 void pop_front() { m_queue.pop_front(); }; 00145 private: 00146 queueT<dataT> m_queue; 00147 }; 00148 00149 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00150 // This is the base class for the Fully and Partially persisted queues 00151 // When the queue is instantiated it will check to see if items exist 00152 // in the backing store and they do it will automatically add them to 00153 // the front of the queue. 00154 00155 template < 00156 typename dataT, 00157 typename serializerT = Serializer<dataT>, 00158 template<typename T, typename A = std::allocator<T> > class queueT = std::deque 00159 > 00160 class BasePersistedTQ : public ITransactionQueue<dataT> 00161 { 00162 public: 00163 00164 typedef dataT value_type; 00165 00166 BasePersistedTQ(std::string const & rootDir, std::string const & queueId) : 00167 m_nextKey(0), m_rootDir(rootDir), m_guid("15934E61-04A5-47cf-86FF-3E02F08F5931"), m_queueId(queueId) 00168 { 00169 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00170 // Create a regex to compare the found files with what we actually want 00171 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00172 std::stringstream ss; 00173 ss // Eg. ^15934E61-04A5-47cf-86FF-3E02F08F5931-([\dABCDEFGabcdefg]{8})\.myqueue$ 00174 << "^" << m_guid // Starts with our guid 00175 <<"-([\\dABCDEFGabcdefg]{8})" // followed by 8 digit hexidecimal 00176 << "\\." // file extension seperator 00177 << m_queueId << "$"; // ends with file extension, using the queue id 00178 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00179 // [ guid ] [ key ] [ qid ] 00180 boost::regex re(ss.str()); // Eg. 15934E61-04A5-47cf-86FF-3E02F08F5931-0F0A0002.myqueue 00181 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00182 00183 boost::filesystem::directory_iterator dirItr; 00184 00185 try 00186 { 00187 // As far as I can tell, this doesn't support globbing :( 00188 dirItr = boost::filesystem::directory_iterator(rootDir); 00189 } 00190 catch(std::exception const & e) 00191 { 00192 std::stringstream ss; 00193 ss 00194 << "Unable to open file queue: " 00195 << rootDir 00196 << 00197 "(" 00198 << e.what() 00199 << ")"; 00200 00201 throw std::runtime_error(ss.str()); 00202 } 00203 00204 for( ; dirItr != boost::filesystem::directory_iterator() ; ++ dirItr) 00205 { 00206 boost::smatch m; 00207 00208 // We need to bind the leaf to a const reference as it's a temporary 00209 // but we need to to exist beyond the lifetime of the call. 00210 std::string const & fname = dirItr->path().leaf(); 00211 00212 if(boost::filesystem::is_regular(dirItr->status()) && boost::regex_match(fname, m, re) && m.size() > 1) 00213 { 00214 // Decode string hex value 00215 std::istringstream ss(m[1]); 00216 uint32_t key = uint32_t(); 00217 ss >> std::hex >> key; 00218 00219 dataT data = dataT(); 00220 00221 if(!m_serializer.Deserialise(dirItr->string(), data)) 00222 { 00223 throw std::runtime_error(std::string("Error loading file queue: ") + dirItr->string()); 00224 } 00225 00226 push_back_(key, data); 00227 m_nextKey = std::max(m_nextKey, key + 1); 00228 } 00229 } 00230 } 00231 00232 ~BasePersistedTQ(){} 00233 00234 size_t size() const { return m_queue.size(); } 00235 00236 bool empty() const { return m_queue.empty(); } 00237 00238 value_type & front() 00239 { 00240 return m_queue.front().second; 00241 } 00242 00243 void push_back(value_type const & data) 00244 { 00245 push_back_(data); 00246 } 00247 00248 void pop_front() 00249 { 00250 // Remove persisted item 00251 m_serializer.Purge(GenerateSerialiseKey(m_queue.front().first)); 00252 00253 // Remove from the queue 00254 m_queue.pop_front(); 00255 } 00256 00257 protected: 00258 00259 std::string GenerateSerialiseKey(uint32_t key) const 00260 { 00261 std::stringstream ss; 00262 00263 // We put the guid first as this allows the regex that matches paths 00264 // to short circuit and terminate early. It can abort as soon as the 00265 // guid, which is a fixed string, is found not to match! 00266 ss 00267 << m_guid << "-" 00268 << std::hex << std::setw(8) << std::setfill('0') << key 00269 << "." << m_queueId; 00270 00271 return (m_rootDir / ss.str()).string(); 00272 } 00273 00274 void push_back_(uint32_t key, value_type const & data) 00275 { 00276 m_queue.push_back(std::make_pair(key, data)); 00277 } 00278 00279 uint32_t push_back_(value_type const & data) 00280 { 00281 uint32_t key = m_nextKey++; 00282 push_back_(key, data); 00283 return key; 00284 } 00285 00286 protected: 00287 queueT<std::pair<uint32_t, dataT> > m_queue; 00288 serializerT m_serializer; 00289 00290 private: 00291 uint32_t m_nextKey; 00292 boost::filesystem::path m_rootDir; 00293 std::string m_guid; 00294 std::string m_queueId; 00295 }; 00296 00297 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00298 // This queue will persist the item at the front when it is requested 00299 00300 template < 00301 typename dataT, 00302 typename serializerT = Serializer<dataT>, 00303 template<typename T, typename A = std::allocator<T> > class queueT = std::deque 00304 > 00305 class PartiallyPersistedTQ : public BasePersistedTQ<dataT, serializerT, queueT> 00306 { 00307 public: 00308 PartiallyPersistedTQ( 00309 std::string const & rootDir, 00310 std::string const & queueId 00311 ) : BasePersistedTQ<dataT, serializerT, queueT>(rootDir, queueId) {} 00312 00313 dataT & front() 00314 { 00315 // First we get the key and data item from the queue 00316 uint32_t key = this->m_queue.front().first; 00317 dataT & data = this->m_queue.front().second; 00318 00319 // Now we serialise the data 00320 this->m_serializer.Serialise(this->GenerateSerialiseKey(key), data); 00321 00322 // Now we return the data 00323 return data; 00324 } 00325 }; 00326 00327 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00328 // This queue will persist all items as they are added to the queue 00329 00330 template < 00331 typename dataT, 00332 typename serializerT = Serializer<dataT>, 00333 template<typename T, typename A = std::allocator<T> > class queueT = std::deque 00334 > 00335 class FullyPersistedTQ : public BasePersistedTQ<dataT, serializerT, queueT > 00336 { 00337 public: 00338 FullyPersistedTQ( 00339 std::string const & rootDir, 00340 std::string const & queueId 00341 ) : BasePersistedTQ<dataT, serializerT, queueT>(rootDir, queueId) {} 00342 00343 00344 void push_back(dataT const & data) 00345 { 00346 // First we add it to the queue 00347 uint32_t key = this->push_back_(data); 00348 00349 // And now it's in the queue we serialise 00350 this->m_serializer.Serialise(this->GenerateSerialiseKey(key), data); 00351 } 00352 }; 00353 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00354 00355 }} 00356 00357 #endif // MOOST_TRANSACTION_QUEUE_HPP__