libmoost
/home/mhx/git/github/libmoost/include/moost/transaction/handler.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_TRANSACTION_HANDLER_HPP__
00029 #define MOOST_TRANSACTION_HANDLER_HPP__
00030 
00031 // A very simple transaction handler, which queues jobs and attempts to commit
00032 // them using a user defined commit functor. If this functor returns false or
00033 // throws an exception the commit is deemed to have failed so the job is added
00034 // back to the job queue and retried later.
00035 
00036 // Tip: For a persisted queue take a look a moost/transaction/queue.hpp
00037 
00038 #include <csignal>
00039 
00040 #include <boost/asio.hpp>
00041 #include <boost/bind.hpp>
00042 #include <boost/thread.hpp>
00043 #include <boost/thread/shared_mutex.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/noncopyable.hpp>
00046 
00047 namespace moost { namespace transaction {
00048 
00049 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00050 // Async transaction handler
00051 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00052 //
00053 // queueT has the following interface requirements:
00054 //
00055 //   Functions...
00056 //      void push_back(value_type const &);
00057 //      void pop_front();
00058 //      value_type & front();
00059 //      size_t size() const;
00060 //      bool empty() const;
00061 //
00062 //   Typedefs...
00063 //      value_type
00064 //
00065 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00066 
00067 template <
00068    typename queueT, // See above for interface requirements
00069    typename commitFunctorT
00070    >
00071 class TransactionHandler : boost::noncopyable
00072 {
00073 public:
00074    TransactionHandler(queueT & queue, commitFunctorT commitFunctor) :
00075       m_sigRunning(0),m_queue(queue), m_commitFunctor(commitFunctor)
00076    {
00077       size_t qsize = m_queue.size();
00078 
00080       // Start transaction service
00081       m_spWork.reset(new boost::asio::io_service::work(m_ioService));
00082       m_spThread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &m_ioService)));
00084 
00085       m_sigRunning = 1;
00086 
00087       // Post jobs for currently queued items
00088       postJob(qsize);
00089    }
00090 
00091    ~TransactionHandler()
00092    {
00093       // Stop backup service
00094       //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00095       // The service will try to flush all items in the queue and will requeue
00096       // any that fail; however, it will only try and flush what was in the queue
00097       // when it was signalled to stop, failed items put to the back of the queue
00098       // will not be retried, so they'll be lost unless queue is fully persisted.
00099 
00100       m_sigRunning = 0;
00101       m_spWork.reset();
00102       m_spThread->join();
00103    }
00104 
00105    typedef typename queueT::value_type value_type;
00106 
00107    void post(value_type const data)
00108    {
00109       // Add data item to the queue
00110       {
00111          unique_lock_t rl(m_mtx);
00112          m_queue.push_back(data);
00113       }
00114 
00115       postJob();
00116     }
00117 
00118 private:
00119    void postJob(size_t postCnt = 1)
00120    {
00121       // Tell asio we have jobs to process as long as the service is running
00122       for(size_t post = 0 ; m_sigRunning && (post < postCnt) ; ++post)
00123       {
00124          m_ioService.post(
00125             boost::bind(&TransactionHandler<queueT, commitFunctorT>::CommitHandler, this)
00126          );
00127       }
00128    }
00129 
00130    void CommitHandler()
00131    {
00132       // TODO: Should this drain the queue? Currently it'll process one
00133       //       item at a time, relying on the number of asio posts to
00134       //       be in sync with the number of queued items. Would forcing
00135       //       the full queue to be processed each post be better? It does
00136       //       mean the remaining asio posts will be noops, potentially,
00137       //       but it also means the queue will get fully processed.
00138       bool bCommited = false;
00139 
00140       // Take a copy of the data item so we can unlock mutex asap
00141       value_type data;
00142       {
00143          read_lock_t l(m_mtx);
00144          data = m_queue.front();
00145       }
00146 
00147       try
00148       {
00149          // Attempt commit
00150          bCommited = m_commitFunctor(data);
00151       }
00152       catch(...) { }
00153 
00154       // If there was an issue during the commit requeue
00155       if(!bCommited) { post(data); }
00156 
00157       // Remove current item from the front
00158       {
00159          unique_lock_t rl(m_mtx);
00160          m_queue.pop_front();
00161       }
00162    }
00163 
00164 private:
00165    std::sig_atomic_t volatile m_sigRunning;
00166    queueT & m_queue;
00167    commitFunctorT m_commitFunctor;
00168 
00169    boost::asio::io_service m_ioService;
00170    boost::shared_ptr<boost::asio::io_service::work> m_spWork;
00171    boost::shared_ptr<boost::thread> m_spThread;
00172 
00173    typedef boost::shared_lock<boost::shared_mutex> read_lock_t;
00174    typedef boost::upgrade_lock<boost::shared_mutex> upgrade_lock_t;
00175    typedef boost::upgrade_to_unique_lock<boost::shared_mutex> upgrade_to_unique_lock;
00176    typedef boost::unique_lock<boost::shared_mutex> unique_lock_t;
00177    boost::shared_mutex m_mtx;
00178 };
00179 
00180 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
00181 
00182 }}
00183 
00184 #endif // MOOST_TRANSACTION_HANDLER_HPP__