libmoost
/home/mhx/git/github/libmoost/src/mq/stream_manager.h
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_MQ_STREAM_MANAGER_H_
00029 #define MOOST_MQ_STREAM_MANAGER_H_
00030 
00031 #include <deque>
00032 #include <string>
00033 #include <csignal>
00034 
00035 #include <boost/cstdint.hpp>
00036 #include <boost/shared_ptr.hpp>
00037 #include <boost/thread.hpp>
00038 
00039 #include "stream.hpp"
00040 
00041 namespace moost { namespace mq {
00042 
00043 class stream_manager
00044 {
00045 public:
00046    typedef boost::shared_ptr<stream> stream_ptr;
00047    typedef std::pair<std::string, stream_ptr> topic_stream_pair;
00048 
00049    stream_manager(size_t consumer_pool_size);
00050    ~stream_manager();
00051 
00052    bool insert(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type,
00053                const boost::posix_time::time_duration& max_msg_interval);
00054    bool erase(const std::string& topic);
00055    void clear();
00056 
00057    void get_list(std::vector<topic_stream_pair>& topics) const;
00058 
00059    bool push_message(const std::string& topic, const std::string& message, stomp_client::ack::type& ack_type);
00060 
00061    uint64_t get_num_processed() const
00062    {
00063       boost::mutex::scoped_lock lock(m_mx_num_processed);
00064       return m_num_processed;
00065    }
00066 
00067    size_t get_num_pending() const
00068    {
00069       boost::mutex::scoped_lock lock(m_mx_messages_list);
00070       return m_messages_list.size();
00071    }
00072 
00073    bool max_msg_interval_exceeded() const;
00074 
00075 private:
00076    typedef std::map<std::string, stream_ptr> stream_map;
00077    typedef std::pair<stream_ptr, std::string> stream_message_pair;
00078 
00079    void consumer_thread();
00080 
00081    stream_map m_streams;
00082    mutable boost::mutex m_mx_streams;
00083 
00084    volatile uint64_t m_num_processed;
00085    mutable boost::mutex m_mx_num_processed;
00086 
00087    mutable boost::mutex m_mx_messages_list;
00088    boost::condition_variable m_cond_messages_list;
00089    std::deque<stream_message_pair> m_messages_list;
00090 
00091    boost::thread_group m_consumer_threads;
00092 
00093    volatile sig_atomic_t m_running;
00094 };
00095 
00096 }}
00097 
00098 #endif