libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #include "stream_manager.h" 00029 00030 namespace moost { namespace mq { 00031 00032 stream_manager::stream_manager(size_t consumer_pool_size) 00033 : m_running(1) 00034 { 00035 for (size_t i = 0; i < consumer_pool_size; ++i) 00036 { 00037 m_consumer_threads.create_thread(boost::bind(&stream_manager::consumer_thread, this)); 00038 } 00039 } 00040 00041 stream_manager::~stream_manager() 00042 { 00043 try 00044 { 00045 m_running = 0; 00046 m_cond_messages_list.notify_all(); 00047 m_consumer_threads.join_all(); 00048 } 00049 catch (...) 00050 { 00051 } 00052 } 00053 00054 bool stream_manager::insert(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type, 00055 const boost::posix_time::time_duration& max_msg_interval) 00056 { 00057 boost::mutex::scoped_lock lock(m_mx_streams); 00058 00059 if (m_streams.count(topic)) 00060 { 00061 return false; 00062 } 00063 00064 m_streams[topic].reset(new stream(message_cb, ack_type, max_msg_interval)); 00065 00066 return true; 00067 } 00068 00069 bool stream_manager::erase(const std::string& topic) 00070 { 00071 boost::mutex::scoped_lock lock(m_mx_streams); 00072 00073 stream_map::iterator it = m_streams.find(topic); 00074 00075 if (it == m_streams.end()) 00076 { 00077 return false; 00078 } 00079 00080 m_streams.erase(it); 00081 00082 return true; 00083 } 00084 00085 void stream_manager::clear() 00086 { 00087 boost::mutex::scoped_lock lock(m_mx_streams); 00088 m_streams.clear(); 00089 } 00090 00091 void stream_manager::get_list(std::vector<topic_stream_pair>& topics) const 00092 { 00093 boost::mutex::scoped_lock lock(m_mx_streams); 00094 std::copy(m_streams.begin(), m_streams.end(), std::back_inserter(topics)); 00095 } 00096 00097 bool stream_manager::push_message(const std::string& topic, const std::string& message, stomp_client::ack::type& ack_type) 00098 { 00099 { 00100 boost::mutex::scoped_lock lock(m_mx_num_processed); 00101 ++m_num_processed; 00102 } 00103 00104 stream_ptr sp; 00105 00106 { 00107 boost::mutex::scoped_lock lock(m_mx_streams); 00108 stream_map::const_iterator it = m_streams.find(topic); 00109 00110 if (it != m_streams.end()) 00111 { 00112 sp = it->second; 00113 } 00114 else 00115 { 00116 return false; 00117 } 00118 } 00119 00120 { 00121 boost::mutex::scoped_lock lock(m_mx_messages_list); 00122 m_messages_list.push_back(std::make_pair(sp, message)); 00123 } 00124 00125 m_cond_messages_list.notify_one(); 00126 00127 ack_type = sp->ack_type(); 00128 00129 return true; 00130 } 00131 00132 void stream_manager::consumer_thread() 00133 { 00134 while (m_running) 00135 { 00136 stream_message_pair smp; 00137 00138 { 00139 boost::mutex::scoped_lock lock(m_mx_messages_list); 00140 00141 if (m_messages_list.empty()) 00142 { 00143 m_cond_messages_list.wait(lock); 00144 } 00145 00146 if (m_messages_list.empty()) 00147 { 00148 continue; 00149 } 00150 00151 smp = m_messages_list.front(); 00152 m_messages_list.pop_front(); 00153 } 00154 00155 smp.first->invoke(smp.second); 00156 } 00157 } 00158 00159 bool stream_manager::max_msg_interval_exceeded() const 00160 { 00161 boost::mutex::scoped_lock lock(m_mx_streams); 00162 00163 for (stream_map::const_iterator it = m_streams.begin(); it != m_streams.end(); ++it) 00164 { 00165 if (it->second->max_msg_interval_exceeded()) 00166 { 00167 return true; 00168 } 00169 } 00170 00171 return false; 00172 } 00173 00174 }}