libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_TRANSACTION_HANDLER_HPP__ 00029 #define MOOST_TRANSACTION_HANDLER_HPP__ 00030 00031 // A very simple transaction handler, which queues jobs and attempts to commit 00032 // them using a user defined commit functor. If this functor returns false or 00033 // throws an exception the commit is deemed to have failed so the job is added 00034 // back to the job queue and retried later. 00035 00036 // Tip: For a persisted queue take a look a moost/transaction/queue.hpp 00037 00038 #include <csignal> 00039 00040 #include <boost/asio.hpp> 00041 #include <boost/bind.hpp> 00042 #include <boost/thread.hpp> 00043 #include <boost/thread/shared_mutex.hpp> 00044 #include <boost/shared_ptr.hpp> 00045 #include <boost/noncopyable.hpp> 00046 00047 namespace moost { namespace transaction { 00048 00049 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00050 // Async transaction handler 00051 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00052 // 00053 // queueT has the following interface requirements: 00054 // 00055 // Functions... 00056 // void push_back(value_type const &); 00057 // void pop_front(); 00058 // value_type & front(); 00059 // size_t size() const; 00060 // bool empty() const; 00061 // 00062 // Typedefs... 00063 // value_type 00064 // 00065 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00066 00067 template < 00068 typename queueT, // See above for interface requirements 00069 typename commitFunctorT 00070 > 00071 class TransactionHandler : boost::noncopyable 00072 { 00073 public: 00074 TransactionHandler(queueT & queue, commitFunctorT commitFunctor) : 00075 m_sigRunning(0),m_queue(queue), m_commitFunctor(commitFunctor) 00076 { 00077 size_t qsize = m_queue.size(); 00078 00080 // Start transaction service 00081 m_spWork.reset(new boost::asio::io_service::work(m_ioService)); 00082 m_spThread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &m_ioService))); 00084 00085 m_sigRunning = 1; 00086 00087 // Post jobs for currently queued items 00088 postJob(qsize); 00089 } 00090 00091 ~TransactionHandler() 00092 { 00093 // Stop backup service 00094 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00095 // The service will try to flush all items in the queue and will requeue 00096 // any that fail; however, it will only try and flush what was in the queue 00097 // when it was signalled to stop, failed items put to the back of the queue 00098 // will not be retried, so they'll be lost unless queue is fully persisted. 00099 00100 m_sigRunning = 0; 00101 m_spWork.reset(); 00102 m_spThread->join(); 00103 } 00104 00105 typedef typename queueT::value_type value_type; 00106 00107 void post(value_type const data) 00108 { 00109 // Add data item to the queue 00110 { 00111 unique_lock_t rl(m_mtx); 00112 m_queue.push_back(data); 00113 } 00114 00115 postJob(); 00116 } 00117 00118 private: 00119 void postJob(size_t postCnt = 1) 00120 { 00121 // Tell asio we have jobs to process as long as the service is running 00122 for(size_t post = 0 ; m_sigRunning && (post < postCnt) ; ++post) 00123 { 00124 m_ioService.post( 00125 boost::bind(&TransactionHandler<queueT, commitFunctorT>::CommitHandler, this) 00126 ); 00127 } 00128 } 00129 00130 void CommitHandler() 00131 { 00132 // TODO: Should this drain the queue? Currently it'll process one 00133 // item at a time, relying on the number of asio posts to 00134 // be in sync with the number of queued items. Would forcing 00135 // the full queue to be processed each post be better? It does 00136 // mean the remaining asio posts will be noops, potentially, 00137 // but it also means the queue will get fully processed. 00138 bool bCommited = false; 00139 00140 // Take a copy of the data item so we can unlock mutex asap 00141 value_type data; 00142 { 00143 read_lock_t l(m_mtx); 00144 data = m_queue.front(); 00145 } 00146 00147 try 00148 { 00149 // Attempt commit 00150 bCommited = m_commitFunctor(data); 00151 } 00152 catch(...) { } 00153 00154 // If there was an issue during the commit requeue 00155 if(!bCommited) { post(data); } 00156 00157 // Remove current item from the front 00158 { 00159 unique_lock_t rl(m_mtx); 00160 m_queue.pop_front(); 00161 } 00162 } 00163 00164 private: 00165 std::sig_atomic_t volatile m_sigRunning; 00166 queueT & m_queue; 00167 commitFunctorT m_commitFunctor; 00168 00169 boost::asio::io_service m_ioService; 00170 boost::shared_ptr<boost::asio::io_service::work> m_spWork; 00171 boost::shared_ptr<boost::thread> m_spThread; 00172 00173 typedef boost::shared_lock<boost::shared_mutex> read_lock_t; 00174 typedef boost::upgrade_lock<boost::shared_mutex> upgrade_lock_t; 00175 typedef boost::upgrade_to_unique_lock<boost::shared_mutex> upgrade_to_unique_lock; 00176 typedef boost::unique_lock<boost::shared_mutex> unique_lock_t; 00177 boost::shared_mutex m_mtx; 00178 }; 00179 00180 //=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 00181 00182 }} 00183 00184 #endif // MOOST_TRANSACTION_HANDLER_HPP__