libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_MQ_STOMP_CLIENT_IMPL_H_ 00029 #define MOOST_MQ_STOMP_CLIENT_IMPL_H_ 00030 00031 #include <string> 00032 #include <csignal> 00033 #include <map> 00034 #include <csignal> 00035 00036 #include <boost/asio.hpp> 00037 #include <boost/enable_shared_from_this.hpp> 00038 #include <boost/thread.hpp> 00039 00040 #include "stream_manager.h" 00041 #include "../../include/moost/mq/error.h" 00042 #include "../../include/moost/mq/stomp_client.h" 00043 00044 namespace moost { namespace mq { 00045 00046 class stomp_client::impl : public boost::enable_shared_from_this<stomp_client::impl> 00047 { 00048 public: 00049 typedef boost::function<void (const boost::system::error_code&, const std::string&)> error_cb_t; 00050 00051 impl(size_t consumer_pool_size, 00052 const boost::posix_time::time_duration& keepalive_interval, 00053 const boost::posix_time::time_duration& reconnect_interval); 00054 ~impl(); 00055 00056 void connect(const std::string& hostname, int port, error_cb_t error_cb); 00057 void disconnect(); 00058 00059 void subscribe(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type, 00060 const boost::posix_time::time_duration& max_msg_interval); 00061 void unsubscribe(const std::string& topic); 00062 00063 void send(const std::string& topic, const std::string& message); 00064 00065 bool is_connected() const 00066 { 00067 return m_state != state::disconnected; 00068 } 00069 00070 bool is_online() const 00071 { 00072 return m_state == state::online; 00073 } 00074 00075 uint64_t get_num_processed() const 00076 { 00077 return m_streams.get_num_processed(); 00078 } 00079 00080 size_t get_num_pending() const 00081 { 00082 return m_streams.get_num_pending(); 00083 } 00084 00085 private: 00086 typedef std::map<std::string, std::string> header_map; 00087 00088 struct state 00089 { 00090 enum type 00091 { 00092 disconnected, 00093 connecting, 00094 online, 00095 reconnecting 00096 }; 00097 }; 00098 00099 struct protocol 00100 { 00101 enum type 00102 { 00103 undefined, 00104 version10, 00105 version11 00106 }; 00107 }; 00108 00109 boost::system::error_code connect(); 00110 void reconnect(); 00111 void subscribe(const std::string& topic, stomp_client::ack::type ack_type); 00112 00113 boost::system::error_code send_to_queue(const std::string& command, const std::string& body = std::string()); 00114 boost::system::error_code send_to_queue(const std::string& command, const header_map& headers, const std::string& body = std::string()); 00115 void send_to_queue_async(const std::string& command, const std::string& body = std::string()); 00116 void send_to_queue_async(const std::string& command, const header_map& headers, const std::string& body = std::string()); 00117 void send_to_queue(const std::string& command, const header_map& headers, const std::string& body, boost::system::error_code *ec); 00118 00119 void receive_from_queue(std::string& command, header_map& headers, std::string& body); 00120 00121 void handle_keepalive(const boost::system::error_code& err); 00122 void handle_recv(const boost::system::error_code& err); 00123 void handle_reconnect(const boost::system::error_code& err); 00124 void handle_write(boost::shared_ptr<boost::asio::streambuf>, const boost::system::error_code& err); 00125 void handle_stomp_error(const header_map& headers, const std::string& body); 00126 void handle_dead_conn(const boost::system::error_code& err); 00127 00128 void recv_more(); 00129 void keepalive(); 00130 void dead_conn_detect(); 00131 00132 void on_message(const header_map& headers, const std::string& msg); 00133 00134 static boost::system::error_code make_error_code(error::type ec); 00135 00136 const boost::posix_time::time_duration m_keepalive_interval; 00137 const boost::posix_time::time_duration m_reconnect_interval; 00138 00139 std::string m_hostname; 00140 int m_port; 00141 error_cb_t m_error_cb; 00142 00143 boost::asio::io_service m_ios; 00144 boost::asio::ip::tcp::socket m_socket; 00145 boost::asio::streambuf m_response; 00146 boost::asio::deadline_timer m_keepalive_timer; 00147 boost::asio::deadline_timer m_reconnect_timer; 00148 boost::asio::deadline_timer m_dead_conn_timer; 00149 boost::shared_ptr<boost::asio::io_service::work> m_ios_work; 00150 boost::thread m_ios_thread; 00151 00152 stream_manager m_streams; 00153 00154 volatile sig_atomic_t m_state; 00155 volatile sig_atomic_t m_proto; 00156 }; 00157 00158 }} 00159 00160 #endif