libmoost
/home/mhx/git/github/libmoost/src/mq/stream_manager.cpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #include "stream_manager.h"
00029 
00030 namespace moost { namespace mq {
00031 
00032 stream_manager::stream_manager(size_t consumer_pool_size)
00033    : m_running(1)
00034 {
00035    for (size_t i = 0; i < consumer_pool_size; ++i)
00036    {
00037       m_consumer_threads.create_thread(boost::bind(&stream_manager::consumer_thread, this));
00038    }
00039 }
00040 
00041 stream_manager::~stream_manager()
00042 {
00043    try
00044    {
00045       m_running = 0;
00046       m_cond_messages_list.notify_all();
00047       m_consumer_threads.join_all();
00048    }
00049    catch (...)
00050    {
00051    }
00052 }
00053 
00054 bool stream_manager::insert(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type,
00055                             const boost::posix_time::time_duration& max_msg_interval)
00056 {
00057    boost::mutex::scoped_lock lock(m_mx_streams);
00058 
00059    if (m_streams.count(topic))
00060    {
00061       return false;
00062    }
00063 
00064    m_streams[topic].reset(new stream(message_cb, ack_type, max_msg_interval));
00065 
00066    return true;
00067 }
00068 
00069 bool stream_manager::erase(const std::string& topic)
00070 {
00071    boost::mutex::scoped_lock lock(m_mx_streams);
00072 
00073    stream_map::iterator it = m_streams.find(topic);
00074 
00075    if (it == m_streams.end())
00076    {
00077       return false;
00078    }
00079 
00080    m_streams.erase(it);
00081 
00082    return true;
00083 }
00084 
00085 void stream_manager::clear()
00086 {
00087    boost::mutex::scoped_lock lock(m_mx_streams);
00088    m_streams.clear();
00089 }
00090 
00091 void stream_manager::get_list(std::vector<topic_stream_pair>& topics) const
00092 {
00093    boost::mutex::scoped_lock lock(m_mx_streams);
00094    std::copy(m_streams.begin(), m_streams.end(), std::back_inserter(topics));
00095 }
00096 
00097 bool stream_manager::push_message(const std::string& topic, const std::string& message, stomp_client::ack::type& ack_type)
00098 {
00099    {
00100       boost::mutex::scoped_lock lock(m_mx_num_processed);
00101       ++m_num_processed;
00102    }
00103 
00104    stream_ptr sp;
00105 
00106    {
00107       boost::mutex::scoped_lock lock(m_mx_streams);
00108       stream_map::const_iterator it = m_streams.find(topic);
00109 
00110       if (it != m_streams.end())
00111       {
00112          sp = it->second;
00113       }
00114       else
00115       {
00116          return false;
00117       }
00118    }
00119 
00120    {
00121       boost::mutex::scoped_lock lock(m_mx_messages_list);
00122       m_messages_list.push_back(std::make_pair(sp, message));
00123    }
00124 
00125    m_cond_messages_list.notify_one();
00126 
00127    ack_type = sp->ack_type();
00128 
00129    return true;
00130 }
00131 
00132 void stream_manager::consumer_thread()
00133 {
00134    while (m_running)
00135    {
00136       stream_message_pair smp;
00137 
00138       {
00139          boost::mutex::scoped_lock lock(m_mx_messages_list);
00140 
00141          if (m_messages_list.empty())
00142          {
00143             m_cond_messages_list.wait(lock);
00144          }
00145 
00146          if (m_messages_list.empty())
00147          {
00148             continue;
00149          }
00150 
00151          smp = m_messages_list.front();
00152          m_messages_list.pop_front();
00153       }
00154 
00155       smp.first->invoke(smp.second);
00156    }
00157 }
00158 
00159 bool stream_manager::max_msg_interval_exceeded() const
00160 {
00161    boost::mutex::scoped_lock lock(m_mx_streams);
00162 
00163    for (stream_map::const_iterator it = m_streams.begin(); it != m_streams.end(); ++it)
00164    {
00165       if (it->second->max_msg_interval_exceeded())
00166       {
00167          return true;
00168       }
00169    }
00170 
00171    return false;
00172 }
00173 
00174 }}