libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_MQ_STREAM_H_ 00029 #define MOOST_MQ_STREAM_H_ 00030 00031 #include <string> 00032 00033 #include <boost/function.hpp> 00034 #include <boost/date_time/posix_time/posix_time.hpp> 00035 00036 #include "../../include/moost/mq/stomp_client.h" 00037 00038 namespace moost { namespace mq { 00039 00040 class stream 00041 { 00042 public: 00043 typedef boost::function<void (const std::string&)> message_cb_t; 00044 00045 stream(const message_cb_t& cb, stomp_client::ack::type ack_type, 00046 const boost::posix_time::time_duration& max_msg_interval) 00047 : m_callback(cb) 00048 , m_last_invoke(boost::posix_time::microsec_clock::universal_time()) 00049 , m_ack_type(ack_type) 00050 , m_max_msg_interval(max_msg_interval) 00051 { 00052 } 00053 00054 void invoke(const std::string& message) 00055 { 00056 reset_interval_timer(); 00057 m_callback(message); 00058 } 00059 00060 void reset_interval_timer() 00061 { 00062 // we can save a call if we never need the value in m_last_invoke 00063 00064 if (!m_max_msg_interval.is_pos_infinity()) 00065 { 00066 m_last_invoke = boost::posix_time::microsec_clock::universal_time(); 00067 } 00068 } 00069 00070 stomp_client::ack::type ack_type() const 00071 { 00072 return m_ack_type; 00073 } 00074 00075 const boost::posix_time::time_duration& max_msg_interval() 00076 { 00077 return m_max_msg_interval; 00078 } 00079 00080 bool max_msg_interval_exceeded() const 00081 { 00082 if (m_max_msg_interval.is_pos_infinity()) 00083 { 00084 return false; 00085 } 00086 00087 const boost::posix_time::ptime& now = boost::posix_time::microsec_clock::universal_time(); 00088 return now - m_last_invoke > m_max_msg_interval; 00089 } 00090 00091 private: 00092 message_cb_t m_callback; 00093 boost::posix_time::ptime m_last_invoke; 00094 00095 const stomp_client::ack::type m_ack_type; 00096 const boost::posix_time::time_duration m_max_msg_interval; 00097 }; 00098 00099 }} 00100 00101 #endif