libmoost
/home/mhx/git/github/libmoost/include/moost/transaction/queue.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_TRANSACTION_QUEUE_HPP__
00029 #define MOOST_TRANSACTION_QUEUE_HPP__
00030 
00031 // A collection of queue types for use with moost/transaction/handler.hpp
00032 
00033 #include <fstream>
00034 #include <sstream>
00035 #include <string>
00036 #include <stdexcept>
00037 #include <iomanip>
00038 
00039 #include <boost/filesystem.hpp>
00040 #include <boost/lexical_cast.hpp>
00041 #include <boost/regex.hpp>
00042 
00043 namespace moost { namespace transaction {
00044 
00045 
00046    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00047    // Use this class as the base for your specialized serializer
00048    class SerializerBase
00049    {
00050    public:
00051       void Purge(std::string const & key) const
00052       {
00053          boost::filesystem::remove(key);
00054       }
00055 
00056    protected:
00057       // Only my sub-class can create and destroy me :)
00058       SerializerBase(){}
00059       ~SerializerBase(){}
00060    };
00061    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00062    // This is a generic serialiser for POD types ONLY. If you data has
00063    // specific serialisation requirements you need to implement you own
00064    // serialization functor.
00065 
00066    template <
00067       typename dataT,
00068       typename istreamT = std::ifstream,
00069       typename ostreamT = std::ofstream
00070    >
00071    class Serializer : public SerializerBase
00072    {
00073    public:
00074       bool Serialise(std::string const & key, dataT const & data) const
00075       {
00076          ostreamT oStream(key.c_str(), std::ios::binary | std::ios::trunc);
00077          bool bOk = oStream.is_open();
00078 
00079          if(bOk)
00080          {
00081             oStream.write(reinterpret_cast<char const *>(&data), sizeof(data));
00082             bOk = oStream.good();
00083             oStream.close();
00084          }
00085 
00086          return bOk;
00087       }
00088 
00089       bool Deserialise(std::string const & key, dataT & data) const
00090       {
00091          istreamT iStream(key.c_str(), std::ios::binary);
00092          bool bOk = iStream.is_open();
00093 
00094          if(bOk)
00095          {
00096             iStream.read(reinterpret_cast<char *>(&data), sizeof(data));
00097             bOk = iStream.good();
00098             iStream.close();
00099          }
00100 
00101          return bOk;
00102       }
00103    };
00104 
00105    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00106    // This is the generic interface for transaction queues
00107 
00108    template <
00109       typename dataT
00110    >
00111    class ITransactionQueue
00112    {
00113    public:
00114       typedef dataT value_type;
00115 
00116       // The interface is polymorphic so any sub-classes can be interchanged dynamically at runtime
00117       virtual ~ITransactionQueue(){}
00118 
00119       virtual size_t size() const = 0;
00120       virtual bool empty() const = 0;
00121       virtual value_type & front() = 0;
00122       virtual void push_back(value_type const & data) = 0;
00123       virtual void pop_front() = 0;
00124    };
00125 
00126    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00127    // A non-persisted queue, that uses the ITransactionQueue interface. This
00128    // is provided to allow none, partial and full parsistance to be swapped
00129    // dynamically at runtime depending upon settings
00130 
00131    template <
00132       typename dataT,
00133       template<typename T, typename A = std::allocator<T> > class queueT = std::deque
00134    >
00135    class NonePersistedTQ : public ITransactionQueue<dataT>
00136    {
00137    public:
00138       typedef dataT value_type;
00139 
00140       size_t size() const { return m_queue.size(); }
00141       bool empty() const { return m_queue.empty(); }
00142       value_type & front() { return m_queue.front(); }
00143       void push_back(value_type const & data) { m_queue.push_back(data); };
00144       void pop_front() { m_queue.pop_front(); };
00145    private:
00146       queueT<dataT> m_queue;
00147    };
00148 
00149    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00150    // This is the base class for the Fully and Partially persisted queues
00151    // When the queue is instantiated it will check to see if items exist
00152    // in the backing store and they do it will automatically add them to
00153    // the front of the queue.
00154 
00155    template <
00156       typename dataT,
00157       typename serializerT = Serializer<dataT>,
00158       template<typename T, typename A = std::allocator<T> > class queueT = std::deque
00159    >
00160    class BasePersistedTQ : public ITransactionQueue<dataT>
00161    {
00162    public:
00163 
00164       typedef dataT value_type;
00165 
00166       BasePersistedTQ(std::string const & rootDir, std::string const & queueId) :
00167          m_nextKey(0), m_rootDir(rootDir), m_guid("15934E61-04A5-47cf-86FF-3E02F08F5931"), m_queueId(queueId)
00168       {
00169          //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00170          // Create a regex to compare the found files with what we actually want
00171          //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00172          std::stringstream ss;
00173          ss                                // Eg. ^15934E61-04A5-47cf-86FF-3E02F08F5931-([\dABCDEFGabcdefg]{8})\.myqueue$
00174             << "^" << m_guid               // Starts with our guid
00175             <<"-([\\dABCDEFGabcdefg]{8})"  // followed by 8 digit hexidecimal
00176             << "\\."                       // file extension seperator
00177             << m_queueId << "$";           // ends with file extension, using the queue id
00178          //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00179                                            //     [              guid                ] [ key  ] [ qid ]
00180          boost::regex re(ss.str());        // Eg. 15934E61-04A5-47cf-86FF-3E02F08F5931-0F0A0002.myqueue
00181          //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00182 
00183          boost::filesystem::directory_iterator dirItr;
00184 
00185          try
00186          {
00187             // As far as I can tell, this doesn't support globbing :(
00188             dirItr = boost::filesystem::directory_iterator(rootDir);
00189          }
00190          catch(std::exception const & e)
00191          {
00192             std::stringstream ss;
00193             ss
00194                << "Unable to open file queue: "
00195                << rootDir
00196                <<
00197                "("
00198                << e.what()
00199                << ")";
00200 
00201             throw std::runtime_error(ss.str());
00202          }
00203 
00204          for( ; dirItr != boost::filesystem::directory_iterator() ; ++ dirItr)
00205          {
00206             boost::smatch m;
00207 
00208             // We need to bind the leaf to a const reference as it's a temporary
00209             // but we need to to exist beyond the lifetime of the call.
00210             std::string const & fname = dirItr->path().leaf();
00211 
00212             if(boost::filesystem::is_regular(dirItr->status()) && boost::regex_match(fname, m, re) && m.size() > 1)
00213             {
00214                // Decode string hex value
00215                std::istringstream ss(m[1]);
00216                uint32_t key = uint32_t();
00217                ss >> std::hex >> key;
00218 
00219                dataT data = dataT();
00220 
00221                if(!m_serializer.Deserialise(dirItr->string(), data))
00222                {
00223                   throw std::runtime_error(std::string("Error loading file queue: ") + dirItr->string());
00224                }
00225 
00226                push_back_(key, data);
00227                m_nextKey = std::max(m_nextKey, key + 1);
00228             }
00229          }
00230       }
00231 
00232       ~BasePersistedTQ(){}
00233 
00234       size_t size() const { return m_queue.size(); }
00235 
00236       bool empty() const { return m_queue.empty(); }
00237 
00238       value_type & front()
00239       {
00240          return m_queue.front().second;
00241       }
00242 
00243       void push_back(value_type const & data)
00244       {
00245          push_back_(data);
00246       }
00247 
00248       void pop_front()
00249       {
00250          // Remove persisted item
00251          m_serializer.Purge(GenerateSerialiseKey(m_queue.front().first));
00252 
00253          // Remove from the queue
00254          m_queue.pop_front();
00255       }
00256 
00257    protected:
00258 
00259       std::string GenerateSerialiseKey(uint32_t key) const
00260       {
00261          std::stringstream ss;
00262 
00263          // We put the guid first as this allows the regex that matches paths
00264          // to short circuit and terminate early. It can abort as soon as the
00265          // guid, which is a fixed string, is found not to match!
00266          ss
00267             << m_guid << "-"
00268             << std::hex << std::setw(8) << std::setfill('0') << key
00269             << "." << m_queueId;
00270 
00271          return (m_rootDir / ss.str()).string();
00272       }
00273 
00274       void push_back_(uint32_t key, value_type const & data)
00275       {
00276          m_queue.push_back(std::make_pair(key, data));
00277       }
00278 
00279       uint32_t push_back_(value_type const & data)
00280       {
00281          uint32_t key = m_nextKey++;
00282          push_back_(key, data);
00283          return key;
00284       }
00285 
00286    protected:
00287       queueT<std::pair<uint32_t, dataT> > m_queue;
00288       serializerT m_serializer;
00289 
00290    private:
00291       uint32_t m_nextKey;
00292       boost::filesystem::path m_rootDir;
00293       std::string m_guid;
00294       std::string m_queueId;
00295    };
00296 
00297    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00298    // This queue will persist the item at the front when it is requested
00299 
00300    template <
00301       typename dataT,
00302       typename serializerT = Serializer<dataT>,
00303       template<typename T, typename A = std::allocator<T> > class queueT = std::deque
00304    >
00305    class PartiallyPersistedTQ : public BasePersistedTQ<dataT, serializerT, queueT>
00306    {
00307    public:
00308       PartiallyPersistedTQ(
00309          std::string const & rootDir,
00310          std::string const & queueId
00311          ) : BasePersistedTQ<dataT, serializerT, queueT>(rootDir, queueId) {}
00312 
00313       dataT & front()
00314       {
00315          // First we get the key and data item from the queue
00316          uint32_t key = this->m_queue.front().first;
00317          dataT & data = this->m_queue.front().second;
00318 
00319          // Now we serialise the data
00320          this->m_serializer.Serialise(this->GenerateSerialiseKey(key), data);
00321 
00322          // Now we return the data
00323          return data;
00324       }
00325    };
00326 
00327    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00328    // This queue will persist all items as they are added to the queue
00329 
00330    template <
00331       typename dataT,
00332       typename serializerT = Serializer<dataT>,
00333       template<typename T, typename A = std::allocator<T> > class queueT = std::deque
00334    >
00335    class FullyPersistedTQ : public BasePersistedTQ<dataT, serializerT, queueT >
00336    {
00337    public:
00338       FullyPersistedTQ(
00339          std::string const & rootDir,
00340          std::string const & queueId
00341          ) : BasePersistedTQ<dataT, serializerT, queueT>(rootDir, queueId) {}
00342 
00343 
00344       void push_back(dataT const & data)
00345       {
00346          // First we add it to the queue
00347          uint32_t key = this->push_back_(data);
00348 
00349          // And now it's in the queue we serialise
00350          this->m_serializer.Serialise(this->GenerateSerialiseKey(key), data);
00351       }
00352    };
00353    //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00354 
00355 }}
00356 
00357 #endif // MOOST_TRANSACTION_QUEUE_HPP__