libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_MQ_STREAM_MANAGER_H_ 00029 #define MOOST_MQ_STREAM_MANAGER_H_ 00030 00031 #include <deque> 00032 #include <string> 00033 #include <csignal> 00034 00035 #include <boost/cstdint.hpp> 00036 #include <boost/shared_ptr.hpp> 00037 #include <boost/thread.hpp> 00038 00039 #include "stream.hpp" 00040 00041 namespace moost { namespace mq { 00042 00043 class stream_manager 00044 { 00045 public: 00046 typedef boost::shared_ptr<stream> stream_ptr; 00047 typedef std::pair<std::string, stream_ptr> topic_stream_pair; 00048 00049 stream_manager(size_t consumer_pool_size); 00050 ~stream_manager(); 00051 00052 bool insert(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type, 00053 const boost::posix_time::time_duration& max_msg_interval); 00054 bool erase(const std::string& topic); 00055 void clear(); 00056 00057 void get_list(std::vector<topic_stream_pair>& topics) const; 00058 00059 bool push_message(const std::string& topic, const std::string& message, stomp_client::ack::type& ack_type); 00060 00061 uint64_t get_num_processed() const 00062 { 00063 boost::mutex::scoped_lock lock(m_mx_num_processed); 00064 return m_num_processed; 00065 } 00066 00067 size_t get_num_pending() const 00068 { 00069 boost::mutex::scoped_lock lock(m_mx_messages_list); 00070 return m_messages_list.size(); 00071 } 00072 00073 bool max_msg_interval_exceeded() const; 00074 00075 private: 00076 typedef std::map<std::string, stream_ptr> stream_map; 00077 typedef std::pair<stream_ptr, std::string> stream_message_pair; 00078 00079 void consumer_thread(); 00080 00081 stream_map m_streams; 00082 mutable boost::mutex m_mx_streams; 00083 00084 volatile uint64_t m_num_processed; 00085 mutable boost::mutex m_mx_num_processed; 00086 00087 mutable boost::mutex m_mx_messages_list; 00088 boost::condition_variable m_cond_messages_list; 00089 std::deque<stream_message_pair> m_messages_list; 00090 00091 boost::thread_group m_consumer_threads; 00092 00093 volatile sig_atomic_t m_running; 00094 }; 00095 00096 }} 00097 00098 #endif