libmoost
/home/mhx/git/github/libmoost/src/mq/stomp_client_impl.cpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #include <algorithm>
00029 
00030 #include <boost/date_time/posix_time/time_formatters.hpp>
00031 
00032 #include "../../include/moost/logging.hpp"
00033 #include "../../include/moost/utils/stringify.hpp"
00034 
00035 #include "error_category.h"
00036 #include "stomp_client_impl.h"
00037 
00038 namespace moost { namespace mq {
00039 
00040 boost::system::error_code stomp_client::impl::make_error_code(error::type ec)
00041 {
00042    return boost::system::error_code(ec, mq_error_category());
00043 }
00044 
00045 stomp_client::impl::impl(size_t consumer_pool_size,
00046                          const boost::posix_time::time_duration& keepalive_interval,
00047                          const boost::posix_time::time_duration& reconnect_interval)
00048    : m_keepalive_interval(keepalive_interval)
00049    , m_reconnect_interval(reconnect_interval)
00050    , m_socket(m_ios)
00051    , m_keepalive_timer(m_ios)
00052    , m_reconnect_timer(m_ios)
00053    , m_dead_conn_timer(m_ios)
00054    , m_ios_work(new boost::asio::io_service::work(m_ios))
00055    , m_ios_thread(boost::bind(&boost::asio::io_service::run, &m_ios))
00056    , m_streams(consumer_pool_size)
00057    , m_state(state::disconnected)
00058    , m_proto(protocol::undefined)
00059 {
00060 }
00061 
00062 stomp_client::impl::~impl()
00063 {
00064    try
00065    {
00066       m_ios_work.reset();
00067       m_ios_thread.join();
00068    }
00069    catch (...)
00070    {
00071    }
00072 }
00073 
00074 void stomp_client::impl::connect(const std::string& hostname, int port, error_cb_t error_cb)
00075 {
00076    if (is_connected())
00077    {
00078       throw std::runtime_error("already connected to " + m_hostname + ":" + boost::lexical_cast<std::string>(m_port));
00079    }
00080 
00081    m_hostname = hostname;
00082    m_port = port;
00083    m_error_cb = error_cb;
00084    m_state = state::connecting;
00085    reconnect();
00086 }
00087 
00088 boost::system::error_code stomp_client::impl::connect()
00089 {
00090    using namespace boost::asio::ip;
00091 
00092    boost::system::error_code error;
00093 
00094    tcp::resolver resolver(m_ios);
00095    tcp::resolver::query query(m_hostname, boost::lexical_cast<std::string>(m_port));
00096    tcp::resolver::iterator it = resolver.resolve(query, error);
00097    tcp::resolver::iterator end;
00098 
00099    if (!error)
00100    {
00101       while (it != end)
00102       {
00103          m_socket.connect(*it++, error);
00104 
00105          if (!error)
00106          {
00107             break;
00108          }
00109       }
00110 
00111       if (!error)
00112       {
00113          m_socket.set_option(tcp::acceptor::keep_alive(true));
00114 
00115          header_map headers;
00116          headers["accept-version"] = "1.0,1.1";
00117          headers["host"] = m_hostname;
00118 
00119          MLOG_CLASS_DEBUG("connecting to stomp queue at " << m_hostname << ":" << m_port);
00120 
00121          error = send_to_queue("CONNECT", headers);
00122 
00123          if (!error)
00124          {
00125             boost::asio::read_until(m_socket, m_response, char(0), error);
00126 
00127             if (!error)
00128             {
00129                std::string command, body;
00130 
00131                headers.clear();
00132                receive_from_queue(command, headers, body);
00133 
00134                MLOG_CLASS_TRACE("got " << command << " from stomp queue");
00135 
00136                if (command != "CONNECTED")
00137                {
00138                   return make_error_code(error::connect_cmd_failed);
00139                }
00140 
00141                MLOG_CLASS_TRACE("connected headers: " << moost::utils::stringify(headers));
00142 
00143                header_map::const_iterator it = headers.find("version");
00144                m_proto = it != headers.end() && it->second == "1.1" ? protocol::version11 : protocol::version10;
00145 
00146                m_state = state::online;
00147 
00148                recv_more();
00149                keepalive();
00150             }
00151          }
00152       }
00153    }
00154 
00155    return error;
00156 }
00157 
00158 void stomp_client::impl::disconnect()
00159 {
00160    m_keepalive_timer.cancel();
00161    m_reconnect_timer.cancel();
00162    m_dead_conn_timer.cancel();
00163 
00164    send_to_queue("DISCONNECT");
00165 
00166    m_state = state::disconnected;
00167    m_proto = protocol::undefined;
00168 
00169    m_socket.close();
00170    m_streams.clear();
00171 }
00172 
00173 void stomp_client::impl::reconnect()
00174 {
00175    if (is_connected())
00176    {
00177       MLOG_CLASS_TRACE("attempting to connect");
00178 
00179       boost::system::error_code ec = connect();
00180 
00181       if (!ec)
00182       {
00183          std::vector<stream_manager::topic_stream_pair> topics;
00184          m_streams.get_list(topics);
00185 
00186          for (std::vector<stream_manager::topic_stream_pair>::const_iterator it = topics.begin(); it != topics.end(); ++it)
00187          {
00188             subscribe(it->first, it->second->ack_type());
00189             it->second->reset_interval_timer();
00190          }
00191 
00192          dead_conn_detect();
00193       }
00194       else
00195       {
00196          MLOG_CLASS_TRACE("connect failed: " << ec.message());
00197 
00198          if (m_state == state::connecting)
00199          {
00200             m_state = state::reconnecting;
00201             m_error_cb(ec, "connect failed");
00202          }
00203 
00204          m_reconnect_timer.expires_from_now(m_reconnect_interval);
00205          m_reconnect_timer.async_wait(boost::bind(&stomp_client::impl::handle_reconnect, shared_from_this(), boost::asio::placeholders::error));
00206       }
00207    }
00208 }
00209 
00210 void stomp_client::impl::subscribe(const std::string& topic, stream::message_cb_t message_cb, stomp_client::ack::type ack_type,
00211                                    const boost::posix_time::time_duration& max_msg_interval)
00212 {
00213    if (!is_connected())
00214    {
00215       throw std::runtime_error("not connected");
00216    }
00217 
00218    if (!m_streams.insert(topic, message_cb, ack_type, max_msg_interval))
00219    {
00220       throw std::runtime_error("already subscribed to " + topic);
00221    }
00222 
00223    subscribe(topic, ack_type);
00224 }
00225 
00226 void stomp_client::impl::subscribe(const std::string& topic, stomp_client::ack::type ack_type)
00227 {
00228    if (is_online())
00229    {
00230       header_map headers;
00231       headers["destination"] = topic;
00232       headers["receipt"] = "subscribe:" + topic;
00233       switch (ack_type)
00234       {
00235          case stomp_client::ack::client:
00236             headers["ack"] = "client";
00237             break;
00238 
00239          default:
00240             headers["ack"] = "auto";
00241             break;
00242       }
00243       send_to_queue_async("SUBSCRIBE", headers);
00244    }
00245 }
00246 
00247 void stomp_client::impl::unsubscribe(const std::string& topic)
00248 {
00249    if (!is_connected())
00250    {
00251       throw std::runtime_error("not connected");
00252    }
00253 
00254    if (!m_streams.erase(topic))
00255    {
00256       throw std::runtime_error("not subscribed to " + topic);
00257    }
00258 
00259    if (is_online())
00260    {
00261       header_map headers;
00262       headers["destination"] = topic;
00263       send_to_queue_async("UNSUBSCRIBE", headers);
00264    }
00265 }
00266 
00267 void stomp_client::impl::send(const std::string& topic, const std::string& message)
00268 {
00269    if (!is_connected())
00270    {
00271       throw std::runtime_error("not connected");
00272    }
00273 
00274    header_map headers;
00275    headers["destination"] = topic;
00276    send_to_queue_async("SEND", headers, message);
00277 }
00278 
00279 boost::system::error_code stomp_client::impl::send_to_queue(const std::string& command, const std::string& body)
00280 {
00281    boost::system::error_code ec;
00282    header_map headers;
00283    send_to_queue(command, headers, body, &ec);
00284    return ec;
00285 }
00286 
00287 boost::system::error_code stomp_client::impl::send_to_queue(const std::string& command, const header_map& headers, const std::string& body)
00288 {
00289    boost::system::error_code ec;
00290    send_to_queue(command, headers, body, &ec);
00291    return ec;
00292 }
00293 
00294 void stomp_client::impl::send_to_queue_async(const std::string& command, const std::string& body)
00295 {
00296    header_map headers;
00297    send_to_queue(command, headers, body, 0);
00298 }
00299 
00300 void stomp_client::impl::send_to_queue_async(const std::string& command, const header_map& headers, const std::string& body)
00301 {
00302    send_to_queue(command, headers, body, 0);
00303 }
00304 
00305 void stomp_client::impl::send_to_queue(const std::string& command, const header_map& headers, const std::string& body, boost::system::error_code *ec)
00306 {
00307    boost::shared_ptr<boost::asio::streambuf> request(new boost::asio::streambuf);
00308    std::ostream os(request.get());
00309 
00310    os << command << "\n";
00311 
00312    for (header_map::const_iterator i = headers.begin(); i != headers.end(); ++i)
00313    {
00314       os << i->first << ":" << i->second << "\n";
00315    }
00316 
00317    os << "\n" << body << char(0);
00318 
00319    MLOG_CLASS_TRACE("sending " << command << " to stomp queue (headers: " << moost::utils::stringify(headers) << ") [async=" << (ec == 0) << "]");
00320 
00321    if (ec)
00322    {
00323       boost::asio::write(m_socket, *request, boost::asio::transfer_all(), *ec);
00324    }
00325    else
00326    {
00327       boost::asio::async_write(m_socket, *request, boost::asio::transfer_all(),
00328          boost::bind(&stomp_client::impl::handle_write, shared_from_this(), request, boost::asio::placeholders::error));
00329    }
00330 }
00331 
00332 void stomp_client::impl::receive_from_queue(std::string& command, header_map& headers, std::string& body)
00333 {
00334    std::istream is(&m_response);
00335 
00336    while (std::getline(is, command) && command.empty())
00337    {
00338    }
00339 
00340    std::string header;
00341 
00342    while (std::getline(is, header) && !header.empty())
00343    {
00344       size_t sep = header.find(':');
00345       const std::string& key = header.substr(0, sep);
00346 
00347       if (headers.count(key) == 0)
00348       {
00349          headers[key] = header.substr(sep + 1);
00350       }
00351    }
00352 
00353    std::getline(is, body, '\0');
00354 }
00355 
00356 void stomp_client::impl::keepalive()
00357 {
00358    m_keepalive_timer.expires_from_now(m_keepalive_interval);
00359    m_keepalive_timer.async_wait(boost::bind(&stomp_client::impl::handle_keepalive, shared_from_this(), boost::asio::placeholders::error));
00360 }
00361 
00362 void stomp_client::impl::dead_conn_detect()
00363 {
00364    m_dead_conn_timer.expires_from_now(boost::posix_time::milliseconds(500));
00365    m_dead_conn_timer.async_wait(boost::bind(&stomp_client::impl::handle_dead_conn, shared_from_this(), boost::asio::placeholders::error));
00366 }
00367 
00368 void stomp_client::impl::handle_write(boost::shared_ptr<boost::asio::streambuf>, const boost::system::error_code& err)
00369 {
00370    if (err)
00371    {
00372       MLOG_CLASS_INFO("error while writing to stomp queue: " << err.message());
00373       m_error_cb(err, "error writing to stomp queue");
00374    }
00375 }
00376 
00377 void stomp_client::impl::handle_keepalive(const boost::system::error_code& err)
00378 {
00379    if (err != boost::asio::error::operation_aborted)
00380    {
00381       MLOG_CLASS_TRACE("sending keepalive connect to stomp queue");
00382       send_to_queue_async("CONNECT");
00383       keepalive();
00384    }
00385 }
00386 
00387 void stomp_client::impl::handle_reconnect(const boost::system::error_code& err)
00388 {
00389    if (err != boost::asio::error::operation_aborted)
00390    {
00391       reconnect();
00392    }
00393 }
00394 
00395 void stomp_client::impl::handle_dead_conn(const boost::system::error_code& err)
00396 {
00397    if (err != boost::asio::error::operation_aborted)
00398    {
00399       if (m_streams.max_msg_interval_exceeded())
00400       {
00401          MLOG_CLASS_INFO("max message interval exceeded, forcing reconnect");
00402          m_error_cb(make_error_code(error::connection_lost), "max message interval exceeded");
00403 
00404          // this will trigger a reconnect
00405          m_state = state::reconnecting;
00406          m_socket.close();
00407       }
00408       else
00409       {
00410          dead_conn_detect();
00411       }
00412    }
00413 }
00414 
00415 void stomp_client::impl::handle_stomp_error(const header_map& headers, const std::string& /*body*/)
00416 {
00417    header_map::const_iterator msg = headers.find("message");
00418 
00419    if (msg != headers.end())
00420    {
00421       header_map::const_iterator rcpt = headers.find("receipt-id");
00422 
00423       if (rcpt != headers.end())
00424       {
00425          if (rcpt->second.compare(0, 10, "subscribe:") == 0)
00426          {
00427             const std::string& topic = rcpt->second.substr(10);
00428             m_streams.erase(topic);
00429             m_error_cb(make_error_code(error::subscribe_failed), topic);
00430             return;
00431          }
00432       }
00433 
00434       m_error_cb(make_error_code(error::queue_error), msg->second);
00435    }
00436    else
00437    {
00438       m_error_cb(make_error_code(error::queue_error), "unknown queue error");
00439    }
00440 }
00441 
00442 void stomp_client::impl::handle_recv(const boost::system::error_code& err)
00443 {
00444    if (!is_connected())
00445    {
00446       return;
00447    }
00448 
00449    if (err)
00450    {
00451       if (m_state != state::reconnecting)
00452       {
00453          MLOG_CLASS_INFO("error while receiving from stomp queue: " << err.message());
00454          m_error_cb(make_error_code(error::connection_lost), err.message());
00455          m_state = state::reconnecting;
00456       }
00457 
00458       m_keepalive_timer.cancel();
00459       m_dead_conn_timer.cancel();
00460       reconnect();
00461 
00462       return;
00463    }
00464 
00465    std::string command, body;
00466    header_map headers;
00467    receive_from_queue(command, headers, body);
00468 
00469    MLOG_CLASS_TRACE("got " << command << " from stomp queue (headers: " << moost::utils::stringify(headers) << ")");
00470 
00471    if (command == "MESSAGE")
00472    {
00473       on_message(headers, body);
00474    }
00475    else if (command == "CONNECTED")
00476    {
00477       // ok, this is the normal keepalive response
00478    }
00479    else if (command == "ERROR")
00480    {
00481       handle_stomp_error(headers, body);
00482    }
00483    else if (command == "RECEIPT")
00484    {
00485       // there's no need to handle this, all went well
00486    }
00487    else
00488    {
00489       MLOG_CLASS_WARN("got unexpected " << command << " from stomp queue");
00490    }
00491 
00492    recv_more();
00493 }
00494 
00495 void stomp_client::impl::on_message(const header_map& headers, const std::string& msg)
00496 {
00497    header_map::const_iterator hit = headers.find("destination");
00498 
00499    if (hit != headers.end())
00500    {
00501       stomp_client::ack::type ack_type;
00502 
00503       if (m_streams.push_message(hit->second, msg, ack_type))
00504       {
00505          if (ack_type == stomp_client::ack::client)
00506          {
00507             hit = headers.find("message-id");
00508 
00509             if (hit != headers.end())
00510             {
00511                header_map ack_headers;
00512                ack_headers.insert(*hit);
00513                send_to_queue_async("ACK", ack_headers);
00514             }
00515          }
00516       }
00517       else
00518       {
00519          MLOG_CLASS_DEBUG("no stream found for topic: " + hit->second);
00520       }
00521    }
00522    else
00523    {
00524       MLOG_CLASS_WARN("skipping message without destination");
00525    }
00526 }
00527 
00528 void stomp_client::impl::recv_more()
00529 {
00530    boost::asio::async_read_until(m_socket, m_response, char(0),
00531                                  boost::bind(&stomp_client::impl::handle_recv, shared_from_this(),
00532                                              boost::asio::placeholders::error));
00533 }
00534 
00535 }}