libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #include <algorithm> 00029 00030 #include <boost/date_time/posix_time/time_formatters.hpp> 00031 00032 #include "../../include/moost/logging.hpp" 00033 #include "../../include/moost/utils/stringify.hpp" 00034 00035 #include "error_category.h" 00036 #include "stomp_client_impl.h" 00037 00038 namespace moost { namespace mq { 00039 00040 boost::system::error_code stomp_client::impl::make_error_code(error::type ec) 00041 { 00042 return boost::system::error_code(ec, mq_error_category()); 00043 } 00044 00045 stomp_client::impl::impl(size_t consumer_pool_size, 00046 const boost::posix_time::time_duration& keepalive_interval, 00047 const boost::posix_time::time_duration& reconnect_interval) 00048 : m_keepalive_interval(keepalive_interval) 00049 , m_reconnect_interval(reconnect_interval) 00050 , m_socket(m_ios) 00051 , m_keepalive_timer(m_ios) 00052 , m_reconnect_timer(m_ios) 00053 , m_dead_conn_timer(m_ios) 00054 , m_ios_work(new boost::asio::io_service::work(m_ios)) 00055 , m_ios_thread(boost::bind(&boost::asio::io_service::run, &m_ios)) 00056 , m_streams(consumer_pool_size) 00057 , m_state(state::disconnected) 00058 , m_proto(protocol::undefined) 00059 { 00060 } 00061 00062 stomp_client::impl::~impl() 00063 { 00064 try 00065 { 00066 m_ios_work.reset(); 00067 m_ios_thread.join(); 00068 } 00069 catch (...) 00070 { 00071 } 00072 } 00073 00074 void stomp_client::impl::connect(const std::string& hostname, int port, error_cb_t error_cb) 00075 { 00076 if (is_connected()) 00077 { 00078 throw std::runtime_error("already connected to " + m_hostname + ":" + boost::lexical_cast<std::string>(m_port)); 00079 } 00080 00081 m_hostname = hostname; 00082 m_port = port; 00083 m_error_cb = error_cb; 00084 m_state = state::connecting; 00085 reconnect(); 00086 } 00087 00088 boost::system::error_code stomp_client::impl::connect() 00089 { 00090 using namespace boost::asio::ip; 00091 00092 boost::system::error_code error; 00093 00094 tcp::resolver resolver(m_ios); 00095 tcp::resolver::query query(m_hostname, boost::lexical_cast<std::string>(m_port)); 00096 tcp::resolver::iterator it = resolver.resolve(query, error); 00097 tcp::resolver::iterator end; 00098 00099 if (!error) 00100 { 00101 while (it != end) 00102 { 00103 m_socket.connect(*it++, error); 00104 00105 if (!error) 00106 { 00107 break; 00108 } 00109 } 00110 00111 if (!error) 00112 { 00113 m_socket.set_option(tcp::acceptor::keep_alive(true)); 00114 00115 header_map headers; 00116 headers["accept-version"] = "1.0,1.1"; 00117 headers["host"] = m_hostname; 00118 00119 MLOG_CLASS_DEBUG("connecting to stomp queue at " << m_hostname << ":" << m_port); 00120 00121 error = send_to_queue("CONNECT", headers); 00122 00123 if (!error) 00124 { 00125 boost::asio::read_until(m_socket, m_response, char(0), error); 00126 00127 if (!error) 00128 { 00129 std::string command, body; 00130 00131 headers.clear(); 00132 receive_from_queue(command, headers, body); 00133 00134 MLOG_CLASS_TRACE("got " << command << " from stomp queue"); 00135 00136 if (command != "CONNECTED") 00137 { 00138 return make_error_code(error::connect_cmd_failed); 00139 } 00140 00141 MLOG_CLASS_TRACE("connected headers: " << moost::utils::stringify(headers)); 00142 00143 header_map::const_iterator it = headers.find("version"); 00144 m_proto = it != headers.end() && it->second == "1.1" ? protocol::version11 : protocol::version10; 00145 00146 m_state = state::online; 00147 00148 recv_more(); 00149 keepalive(); 00150 } 00151 } 00152 } 00153 } 00154 00155 return error; 00156 } 00157 00158 void stomp_client::impl::disconnect() 00159 { 00160 m_keepalive_timer.cancel(); 00161 m_reconnect_timer.cancel(); 00162 m_dead_conn_timer.cancel(); 00163 00164 send_to_queue("DISCONNECT"); 00165 00166 m_state = state::disconnected; 00167 m_proto = protocol::undefined; 00168 00169 m_socket.close(); 00170 m_streams.clear(); 00171 } 00172 00173 void stomp_client::impl::reconnect() 00174 { 00175 if (is_connected()) 00176 { 00177 MLOG_CLASS_TRACE("attempting to connect"); 00178 00179 boost::system::error_code ec = connect(); 00180 00181 if (!ec) 00182 { 00183 std::vector<stream_manager::topic_stream_pair> topics; 00184 m_streams.get_list(topics); 00185 00186 for (std::vector<stream_manager::topic_stream_pair>::const_iterator it = topics.begin(); it != topics.end(); ++it) 00187 { 00188 subscribe(it->first, it->second->ack_type()); 00189 it->second->reset_interval_timer(); 00190 } 00191 00192 dead_conn_detect(); 00193 } 00194 else 00195 { 00196 MLOG_CLASS_TRACE("connect failed: " << ec.message()); 00197 00198 if (m_state == state::connecting) 00199 { 00200 m_state = state::reconnecting; 00201 m_error_cb(ec, "connect failed"); 00202 } 00203 00204 m_reconnect_timer.expires_from_now(m_reconnect_interval); 00205 m_reconnect_timer.async_wait(boost::bind(&stomp_client::impl::handle_reconnect, shared_from_this(), boost::asio::placeholders::error)); 00206 } 00207 } 00208 } 00209 00210 void stomp_client::impl::subscribe(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type, 00211 const boost::posix_time::time_duration& max_msg_interval) 00212 { 00213 if (!is_connected()) 00214 { 00215 throw std::runtime_error("not connected"); 00216 } 00217 00218 if (!m_streams.insert(topic, message_cb, ack_type, max_msg_interval)) 00219 { 00220 throw std::runtime_error("already subscribed to " + topic); 00221 } 00222 00223 subscribe(topic, ack_type); 00224 } 00225 00226 void stomp_client::impl::subscribe(const std::string& topic, stomp_client::ack::type ack_type) 00227 { 00228 if (is_online()) 00229 { 00230 header_map headers; 00231 headers["destination"] = topic; 00232 headers["receipt"] = "subscribe:" + topic; 00233 switch (ack_type) 00234 { 00235 case stomp_client::ack::client: 00236 headers["ack"] = "client"; 00237 break; 00238 00239 default: 00240 headers["ack"] = "auto"; 00241 break; 00242 } 00243 send_to_queue_async("SUBSCRIBE", headers); 00244 } 00245 } 00246 00247 void stomp_client::impl::unsubscribe(const std::string& topic) 00248 { 00249 if (!is_connected()) 00250 { 00251 throw std::runtime_error("not connected"); 00252 } 00253 00254 if (!m_streams.erase(topic)) 00255 { 00256 throw std::runtime_error("not subscribed to " + topic); 00257 } 00258 00259 if (is_online()) 00260 { 00261 header_map headers; 00262 headers["destination"] = topic; 00263 send_to_queue_async("UNSUBSCRIBE", headers); 00264 } 00265 } 00266 00267 void stomp_client::impl::send(const std::string& topic, const std::string& message) 00268 { 00269 if (!is_connected()) 00270 { 00271 throw std::runtime_error("not connected"); 00272 } 00273 00274 header_map headers; 00275 headers["destination"] = topic; 00276 send_to_queue_async("SEND", headers, message); 00277 } 00278 00279 boost::system::error_code stomp_client::impl::send_to_queue(const std::string& command, const std::string& body) 00280 { 00281 boost::system::error_code ec; 00282 header_map headers; 00283 send_to_queue(command, headers, body, &ec); 00284 return ec; 00285 } 00286 00287 boost::system::error_code stomp_client::impl::send_to_queue(const std::string& command, const header_map& headers, const std::string& body) 00288 { 00289 boost::system::error_code ec; 00290 send_to_queue(command, headers, body, &ec); 00291 return ec; 00292 } 00293 00294 void stomp_client::impl::send_to_queue_async(const std::string& command, const std::string& body) 00295 { 00296 header_map headers; 00297 send_to_queue(command, headers, body, 0); 00298 } 00299 00300 void stomp_client::impl::send_to_queue_async(const std::string& command, const header_map& headers, const std::string& body) 00301 { 00302 send_to_queue(command, headers, body, 0); 00303 } 00304 00305 void stomp_client::impl::send_to_queue(const std::string& command, const header_map& headers, const std::string& body, boost::system::error_code *ec) 00306 { 00307 boost::shared_ptr<boost::asio::streambuf> request(new boost::asio::streambuf); 00308 std::ostream os(request.get()); 00309 00310 os << command << "\n"; 00311 00312 for (header_map::const_iterator i = headers.begin(); i != headers.end(); ++i) 00313 { 00314 os << i->first << ":" << i->second << "\n"; 00315 } 00316 00317 os << "\n" << body << char(0); 00318 00319 MLOG_CLASS_TRACE("sending " << command << " to stomp queue (headers: " << moost::utils::stringify(headers) << ") [async=" << (ec == 0) << "]"); 00320 00321 if (ec) 00322 { 00323 boost::asio::write(m_socket, *request, boost::asio::transfer_all(), *ec); 00324 } 00325 else 00326 { 00327 boost::asio::async_write(m_socket, *request, boost::asio::transfer_all(), 00328 boost::bind(&stomp_client::impl::handle_write, shared_from_this(), request, boost::asio::placeholders::error)); 00329 } 00330 } 00331 00332 void stomp_client::impl::receive_from_queue(std::string& command, header_map& headers, std::string& body) 00333 { 00334 std::istream is(&m_response); 00335 00336 while (std::getline(is, command) && command.empty()) 00337 { 00338 } 00339 00340 std::string header; 00341 00342 while (std::getline(is, header) && !header.empty()) 00343 { 00344 size_t sep = header.find(':'); 00345 const std::string& key = header.substr(0, sep); 00346 00347 if (headers.count(key) == 0) 00348 { 00349 headers[key] = header.substr(sep + 1); 00350 } 00351 } 00352 00353 std::getline(is, body, '\0'); 00354 } 00355 00356 void stomp_client::impl::keepalive() 00357 { 00358 m_keepalive_timer.expires_from_now(m_keepalive_interval); 00359 m_keepalive_timer.async_wait(boost::bind(&stomp_client::impl::handle_keepalive, shared_from_this(), boost::asio::placeholders::error)); 00360 } 00361 00362 void stomp_client::impl::dead_conn_detect() 00363 { 00364 m_dead_conn_timer.expires_from_now(boost::posix_time::milliseconds(500)); 00365 m_dead_conn_timer.async_wait(boost::bind(&stomp_client::impl::handle_dead_conn, shared_from_this(), boost::asio::placeholders::error)); 00366 } 00367 00368 void stomp_client::impl::handle_write(boost::shared_ptr<boost::asio::streambuf>, const boost::system::error_code& err) 00369 { 00370 if (err) 00371 { 00372 MLOG_CLASS_INFO("error while writing to stomp queue: " << err.message()); 00373 m_error_cb(err, "error writing to stomp queue"); 00374 } 00375 } 00376 00377 void stomp_client::impl::handle_keepalive(const boost::system::error_code& err) 00378 { 00379 if (err != boost::asio::error::operation_aborted) 00380 { 00381 MLOG_CLASS_TRACE("sending keepalive connect to stomp queue"); 00382 send_to_queue_async("CONNECT"); 00383 keepalive(); 00384 } 00385 } 00386 00387 void stomp_client::impl::handle_reconnect(const boost::system::error_code& err) 00388 { 00389 if (err != boost::asio::error::operation_aborted) 00390 { 00391 reconnect(); 00392 } 00393 } 00394 00395 void stomp_client::impl::handle_dead_conn(const boost::system::error_code& err) 00396 { 00397 if (err != boost::asio::error::operation_aborted) 00398 { 00399 if (m_streams.max_msg_interval_exceeded()) 00400 { 00401 MLOG_CLASS_INFO("max message interval exceeded, forcing reconnect"); 00402 m_error_cb(make_error_code(error::connection_lost), "max message interval exceeded"); 00403 00404 // this will trigger a reconnect 00405 m_state = state::reconnecting; 00406 m_socket.close(); 00407 } 00408 else 00409 { 00410 dead_conn_detect(); 00411 } 00412 } 00413 } 00414 00415 void stomp_client::impl::handle_stomp_error(const header_map& headers, const std::string& /*body*/) 00416 { 00417 header_map::const_iterator msg = headers.find("message"); 00418 00419 if (msg != headers.end()) 00420 { 00421 header_map::const_iterator rcpt = headers.find("receipt-id"); 00422 00423 if (rcpt != headers.end()) 00424 { 00425 if (rcpt->second.compare(0, 10, "subscribe:") == 0) 00426 { 00427 const std::string& topic = rcpt->second.substr(10); 00428 m_streams.erase(topic); 00429 m_error_cb(make_error_code(error::subscribe_failed), topic); 00430 return; 00431 } 00432 } 00433 00434 m_error_cb(make_error_code(error::queue_error), msg->second); 00435 } 00436 else 00437 { 00438 m_error_cb(make_error_code(error::queue_error), "unknown queue error"); 00439 } 00440 } 00441 00442 void stomp_client::impl::handle_recv(const boost::system::error_code& err) 00443 { 00444 if (!is_connected()) 00445 { 00446 return; 00447 } 00448 00449 if (err) 00450 { 00451 if (m_state != state::reconnecting) 00452 { 00453 MLOG_CLASS_INFO("error while receiving from stomp queue: " << err.message()); 00454 m_error_cb(make_error_code(error::connection_lost), err.message()); 00455 m_state = state::reconnecting; 00456 } 00457 00458 m_keepalive_timer.cancel(); 00459 m_dead_conn_timer.cancel(); 00460 reconnect(); 00461 00462 return; 00463 } 00464 00465 std::string command, body; 00466 header_map headers; 00467 receive_from_queue(command, headers, body); 00468 00469 MLOG_CLASS_TRACE("got " << command << " from stomp queue (headers: " << moost::utils::stringify(headers) << ")"); 00470 00471 if (command == "MESSAGE") 00472 { 00473 on_message(headers, body); 00474 } 00475 else if (command == "CONNECTED") 00476 { 00477 // ok, this is the normal keepalive response 00478 } 00479 else if (command == "ERROR") 00480 { 00481 handle_stomp_error(headers, body); 00482 } 00483 else if (command == "RECEIPT") 00484 { 00485 // there's no need to handle this, all went well 00486 } 00487 else 00488 { 00489 MLOG_CLASS_WARN("got unexpected " << command << " from stomp queue"); 00490 } 00491 00492 recv_more(); 00493 } 00494 00495 void stomp_client::impl::on_message(const header_map& headers, const std::string& msg) 00496 { 00497 header_map::const_iterator hit = headers.find("destination"); 00498 00499 if (hit != headers.end()) 00500 { 00501 stomp_client::ack::type ack_type; 00502 00503 if (m_streams.push_message(hit->second, msg, ack_type)) 00504 { 00505 if (ack_type == stomp_client::ack::client) 00506 { 00507 hit = headers.find("message-id"); 00508 00509 if (hit != headers.end()) 00510 { 00511 header_map ack_headers; 00512 ack_headers.insert(*hit); 00513 send_to_queue_async("ACK", ack_headers); 00514 } 00515 } 00516 } 00517 else 00518 { 00519 MLOG_CLASS_DEBUG("no stream found for topic: " + hit->second); 00520 } 00521 } 00522 else 00523 { 00524 MLOG_CLASS_WARN("skipping message without destination"); 00525 } 00526 } 00527 00528 void stomp_client::impl::recv_more() 00529 { 00530 boost::asio::async_read_until(m_socket, m_response, char(0), 00531 boost::bind(&stomp_client::impl::handle_recv, shared_from_this(), 00532 boost::asio::placeholders::error)); 00533 } 00534 00535 }}