libmoost
/home/mhx/git/github/libmoost/src/mq/stomp_client_impl.h
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_MQ_STOMP_CLIENT_IMPL_H_
00029 #define MOOST_MQ_STOMP_CLIENT_IMPL_H_
00030 
00031 #include <string>
00032 #include <csignal>
00033 #include <map>
00034 #include <csignal>
00035 
00036 #include <boost/asio.hpp>
00037 #include <boost/enable_shared_from_this.hpp>
00038 #include <boost/thread.hpp>
00039 
00040 #include "stream_manager.h"
00041 #include "../../include/moost/mq/error.h"
00042 #include "../../include/moost/mq/stomp_client.h"
00043 
00044 namespace moost { namespace mq {
00045 
00046 class stomp_client::impl : public boost::enable_shared_from_this<stomp_client::impl>
00047 {
00048 public:
00049    typedef boost::function<void (const boost::system::error_code&, const std::string&)> error_cb_t;
00050 
00051    impl(size_t consumer_pool_size,
00052         const boost::posix_time::time_duration& keepalive_interval,
00053         const boost::posix_time::time_duration& reconnect_interval);
00054    ~impl();
00055 
00056    void connect(const std::string& hostname, int port, error_cb_t error_cb);
00057    void disconnect();
00058 
00059    void subscribe(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type,
00060                   const boost::posix_time::time_duration& max_msg_interval);
00061    void unsubscribe(const std::string& topic);
00062 
00063    void send(const std::string& topic, const std::string& message);
00064 
00065    bool is_connected() const
00066    {
00067       return m_state != state::disconnected;
00068    }
00069 
00070    bool is_online() const
00071    {
00072       return m_state == state::online;
00073    }
00074 
00075    uint64_t get_num_processed() const
00076    {
00077       return m_streams.get_num_processed();
00078    }
00079 
00080    size_t get_num_pending() const
00081    {
00082       return m_streams.get_num_pending();
00083    }
00084 
00085 private:
00086    typedef std::map<std::string, std::string> header_map;
00087 
00088    struct state
00089    {
00090       enum type
00091       {
00092          disconnected,
00093          connecting,
00094          online,
00095          reconnecting
00096       };
00097    };
00098 
00099    struct protocol
00100    {
00101       enum type
00102       {
00103          undefined,
00104          version10,
00105          version11
00106       };
00107    };
00108 
00109    boost::system::error_code connect();
00110    void reconnect();
00111    void subscribe(const std::string& topic, stomp_client::ack::type ack_type);
00112 
00113    boost::system::error_code send_to_queue(const std::string& command, const std::string& body = std::string());
00114    boost::system::error_code send_to_queue(const std::string& command, const header_map& headers, const std::string& body = std::string());
00115    void send_to_queue_async(const std::string& command, const std::string& body = std::string());
00116    void send_to_queue_async(const std::string& command, const header_map& headers, const std::string& body = std::string());
00117    void send_to_queue(const std::string& command, const header_map& headers, const std::string& body, boost::system::error_code *ec);
00118 
00119    void receive_from_queue(std::string& command, header_map& headers, std::string& body);
00120 
00121    void handle_keepalive(const boost::system::error_code& err);
00122    void handle_recv(const boost::system::error_code& err);
00123    void handle_reconnect(const boost::system::error_code& err);
00124    void handle_write(boost::shared_ptr<boost::asio::streambuf>, const boost::system::error_code& err);
00125    void handle_stomp_error(const header_map& headers, const std::string& body);
00126    void handle_dead_conn(const boost::system::error_code& err);
00127 
00128    void recv_more();
00129    void keepalive();
00130    void dead_conn_detect();
00131 
00132    void on_message(const header_map& headers, const std::string& msg);
00133 
00134    static boost::system::error_code make_error_code(error::type ec);
00135 
00136    const boost::posix_time::time_duration m_keepalive_interval;
00137    const boost::posix_time::time_duration m_reconnect_interval;
00138 
00139    std::string m_hostname;
00140    int m_port;
00141    error_cb_t m_error_cb;
00142 
00143    boost::asio::io_service m_ios;
00144    boost::asio::ip::tcp::socket m_socket;
00145    boost::asio::streambuf m_response;
00146    boost::asio::deadline_timer m_keepalive_timer;
00147    boost::asio::deadline_timer m_reconnect_timer;
00148    boost::asio::deadline_timer m_dead_conn_timer;
00149    boost::shared_ptr<boost::asio::io_service::work> m_ios_work;
00150    boost::thread m_ios_thread;
00151 
00152    stream_manager m_streams;
00153 
00154    volatile sig_atomic_t m_state;
00155    volatile sig_atomic_t m_proto;
00156 };
00157 
00158 }}
00159 
00160 #endif