libmoost
/home/mhx/git/github/libmoost/src/tools/mq/stomp_test_client.cpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #include <iostream>
00029 
00030 #include <boost/program_options.hpp>
00031 #include <boost/tokenizer.hpp>
00032 #include <boost/bind.hpp>
00033 
00034 #include "../../../include/moost/shell.hpp"
00035 #include "../../../include/moost/logging/global.hpp"
00036 #include "../../../include/moost/mq/stomp_client.h"
00037 #include "../../../include/moost/version.h"
00038 
00039 namespace po = boost::program_options;
00040 
00041 namespace {
00042 
00043    template <class I>
00044    void parse_arguments(const std::string& str, I iter)
00045    {
00046       boost::tokenizer< boost::escaped_list_separator<char> >
00047          tok(str, boost::escaped_list_separator<char>("\\", " \t", "'\""));
00048       std::copy(tok.begin(), tok.end(), iter);
00049    }
00050 
00051 }
00052 
00053 class stomp_test_client
00054 {
00055 public:
00056    stomp_test_client()
00057       : m_consumer_pool_size(1)
00058       , m_keepalive_interval(30.0)
00059       , m_reconnect_interval(1.0)
00060       , m_default_log_level("info")
00061    {
00062    }
00063 
00064    int run(int argc, char * argv[])
00065    {
00066       if (init(argc, argv))
00067       {
00068          process();
00069       }
00070 
00071       return 0;
00072    }
00073 
00074    std::string show_help() const
00075    {
00076       return
00077          "- connect           connect to queue\r\n"
00078          "- disconnect        disconnect from queue\r\n"
00079          "- subscribe         subscribe to topic\r\n"
00080          "- unsubscribe       unsubscribe from topic\r\n"
00081          "- status            connection status\r\n"
00082          "- num_processed     get number of processed messages\r\n"
00083          "- num_pending       get number of pending messages\r\n"
00084          "- send              send a message\r\n"
00085          "- reset             reset client object\r\n"
00086          "-------------------------------------------------------\r\n"
00087          " <cmd> --help       will show help for each command\r\n"
00088       ;
00089    }
00090 
00091    std::string get_prompt() const
00092    {
00093       std::ostringstream oss;
00094       oss << "stomp> ";
00095       return oss.str();
00096    }
00097 
00098    bool handle_command(std::string& rv, const std::string& cmd, const std::string& args)
00099    {
00100       cmd_method_t meth;
00101 
00102       if      (cmd == "connect")       { meth = &stomp_test_client::connect; }
00103       else if (cmd == "disconnect")    { meth = &stomp_test_client::disconnect; }
00104       else if (cmd == "subscribe")     { meth = &stomp_test_client::subscribe; }
00105       else if (cmd == "unsubscribe")   { meth = &stomp_test_client::unsubscribe; }
00106       else if (cmd == "status")        { meth = &stomp_test_client::status; }
00107       else if (cmd == "num_processed") { meth = &stomp_test_client::num_processed; }
00108       else if (cmd == "num_pending")   { meth = &stomp_test_client::num_pending; }
00109       else if (cmd == "send")          { meth = &stomp_test_client::send; }
00110       else if (cmd == "reset")         { meth = &stomp_test_client::reset; }
00111       else
00112       {
00113          return false;
00114       }
00115 
00116       std::vector<std::string> av;
00117       av.push_back(cmd);
00118 
00119       try
00120       {
00121          parse_arguments(args, std::back_inserter(av));
00122       }
00123       catch (const std::exception& e)
00124       {
00125          rv = std::string("parse error: ") + e.what() + "\n";
00126          return true;
00127       }
00128 
00129       std::ostringstream os;
00130 
00131       if (run_command(meth, rv, av, os))
00132       {
00133          rv = os.str();
00134       }
00135 
00136       return true;
00137    }
00138 
00139 private:
00140    typedef void (stomp_test_client::*cmd_method_t)(const std::vector<std::string>&, std::ostream&);
00141 
00142    size_t m_consumer_pool_size;
00143    float m_keepalive_interval;
00144    float m_reconnect_interval;
00145    std::string m_default_log_level;
00146    std::string m_logging_config;
00147 
00148    boost::shared_ptr<moost::mq::stomp_client> m_client;
00149 
00150    void show_help(po::options_description& opt) const
00151    {
00152       std::cout
00153          << "mq-stomp-test-client (" << LIBMOOST_REVISION_STR << ")\n"
00154          << "Build: " << __DATE__ << " (" << __TIME__ << ") " << LIBMOOST_COPYRIGHT_STR << "\n\n"
00155          << opt << std::endl;
00156 
00157       exit(0);
00158    }
00159 
00160    bool init(int argc, char * argv[])
00161    {
00162       po::options_description cmdline_options("Command line options");
00163       cmdline_options.add_options()
00164          ("consumer-pool-size", po::value<size_t>(&m_consumer_pool_size)->default_value(1), "consumer pool size")
00165          ("keepalive-interval", po::value<float>(&m_keepalive_interval)->default_value(30.0), "keepalive interval")
00166          ("reconnect-interval", po::value<float>(&m_reconnect_interval)->default_value(1.0), "reconnect interval")
00167          ("log-level,l", po::value<std::string>(&m_default_log_level)->default_value("info"), "default log level")
00168          ("logging-config", po::value<std::string>(&m_logging_config), "logging configuration file")
00169          ("help,h", "output help message and exit")
00170          ;
00171 
00172       po::variables_map vm;
00173 
00174       po::store(po::parse_command_line(argc, argv, cmdline_options), vm);
00175       po::notify(vm);
00176 
00177       if (vm.count("help"))
00178       {
00179          show_help(cmdline_options);
00180          return false;
00181       }
00182 
00183       return true;
00184    }
00185 
00186    void parse_options(const std::vector<std::string>& args, const po::options_description& od, const po::positional_options_description& pd, po::variables_map& vm)
00187    {
00188       std::vector<char *> argv;
00189       for (std::vector<std::string>::const_iterator it = args.begin(); it != args.end(); ++it)
00190       {
00191          argv.push_back(const_cast<char *>(it->c_str()));
00192       }
00193       // po::store(po::parse_command_line(argv.size(), &argv[0], od), vm);
00194       po::store(po::command_line_parser(argv.size(), &argv[0]).options(od).positional(pd).run(), vm);
00195       po::notify(vm);
00196    }
00197 
00198    void error_callback(const boost::system::error_code& ec, const std::string& str)
00199    {
00200       std::cerr << "queue error: " << ec.message() << " (" << str << ")" << std::endl;
00201    }
00202 
00203    void message_callback(const std::string& topic, const std::string& msg)
00204    {
00205       std::cout << "[" << topic << "] " << msg << std::endl;
00206    }
00207 
00208    bool simple_command_help(const std::string& name, const std::vector<std::string>& args, std::ostream& os)
00209    {
00210       po::options_description opt(name + " options");
00211       opt.add_options()
00212          ("help,h", "output help message")
00213          ;
00214 
00215       po::positional_options_description pos;
00216 
00217       po::variables_map vm;
00218 
00219       parse_options(args, opt, pos, vm);
00220 
00221       if (vm.count("help"))
00222       {
00223          os << opt << std::endl;
00224          return false;
00225       }
00226 
00227       return true;
00228    }
00229 
00230    template <typename Func>
00231    void simple_command(const std::string& name, Func func, const std::vector<std::string>& args, std::ostream& os)
00232    {
00233       if (simple_command_help(name, args, os))
00234       {
00235          func();
00236       }
00237    }
00238 
00239    void connect(const std::vector<std::string>& args, std::ostream& os)
00240    {
00241       std::string server;
00242       int port;
00243 
00244       po::options_description opt("connect options");
00245       opt.add_options()
00246          ("server,s", po::value<std::string>(&server), "server name")
00247          ("port,p", po::value<int>(&port)->default_value(61613), "port")
00248          ("help,h", "output help message")
00249          ;
00250 
00251       po::positional_options_description pos;
00252       pos.add("server", 1).add("port", 1);
00253 
00254       po::variables_map vm;
00255 
00256       parse_options(args, opt, pos, vm);
00257 
00258       if (vm.count("help") || !vm.count("server") || !vm.count("port"))
00259       {
00260          os << opt << std::endl;
00261          return;
00262       }
00263 
00264       m_client->connect(server, port, boost::bind(&stomp_test_client::error_callback, this, _1, _2));
00265    }
00266 
00267    void subscribe(const std::vector<std::string>& args, std::ostream& os)
00268    {
00269       std::string topic;
00270       bool ack = false;
00271       float max_msg_interval = -1.0f;
00272 
00273       po::options_description opt("subscribe options");
00274       opt.add_options()
00275          ("topic,t", po::value<std::string>(&topic), "topic")
00276          ("ack,a", po::value<bool>(&ack)->zero_tokens(), "acknowledge")
00277          ("max-msg-interval,m", po::value<float>(&max_msg_interval), "max message interval")
00278          ("help,h", "output help message")
00279          ;
00280 
00281       po::positional_options_description pos;
00282       pos.add("topic", 1);
00283 
00284       po::variables_map vm;
00285 
00286       parse_options(args, opt, pos, vm);
00287 
00288       if (vm.count("help") || !vm.count("topic"))
00289       {
00290          os << opt << std::endl;
00291          return;
00292       }
00293 
00294       m_client->subscribe(topic, boost::bind(&stomp_test_client::message_callback, this, topic, _1),
00295                           ack ? moost::mq::stomp_client::ack::automatic : moost::mq::stomp_client::ack::client,
00296                           max_msg_interval < 0.0f ? boost::posix_time::time_duration(boost::posix_time::pos_infin)
00297                                                   : boost::posix_time::milliseconds(1000*max_msg_interval));
00298    }
00299 
00300    void unsubscribe(const std::vector<std::string>& args, std::ostream& os)
00301    {
00302       std::string topic;
00303 
00304       po::options_description opt("unsubscribe options");
00305       opt.add_options()
00306          ("topic,t", po::value<std::string>(&topic), "topic")
00307          ("help,h", "output help message")
00308          ;
00309 
00310       po::positional_options_description pos;
00311       pos.add("topic", 1);
00312 
00313       po::variables_map vm;
00314 
00315       parse_options(args, opt, pos, vm);
00316 
00317       if (vm.count("help") || !vm.count("topic"))
00318       {
00319          os << opt << std::endl;
00320          return;
00321       }
00322 
00323       m_client->unsubscribe(topic);
00324    }
00325 
00326    void send(const std::vector<std::string>& args, std::ostream& os)
00327    {
00328       std::string topic;
00329       std::string message;
00330 
00331       po::options_description opt("send options");
00332       opt.add_options()
00333          ("topic,t", po::value<std::string>(&topic), "topic")
00334          ("message,m", po::value<std::string>(&message), "message")
00335          ("help,h", "output help message")
00336          ;
00337 
00338       po::positional_options_description pos;
00339       pos.add("topic", 1);
00340       pos.add("message", 2);
00341 
00342       po::variables_map vm;
00343 
00344       parse_options(args, opt, pos, vm);
00345 
00346       if (vm.count("help") || !vm.count("topic") || !vm.count("message"))
00347       {
00348          os << opt << std::endl;
00349          return;
00350       }
00351 
00352       m_client->send(topic, message);
00353    }
00354 
00355    void disconnect(const std::vector<std::string>& args, std::ostream& os)
00356    {
00357       simple_command("disconnect", boost::bind(&moost::mq::stomp_client::disconnect, m_client), args, os);
00358    }
00359 
00360    void reset(const std::vector<std::string>& args, std::ostream& os)
00361    {
00362       simple_command("reset", boost::bind(&stomp_test_client::reset_client, this), args, os);
00363    }
00364 
00365    void status(const std::vector<std::string>& args, std::ostream& os)
00366    {
00367       if (simple_command_help("status", args, os))
00368       {
00369          os << (m_client->is_connected() ? "connected" : "disconnected") << ", "
00370             << (m_client->is_online() ? "online" : "offline") << "\n";
00371       }
00372    }
00373 
00374    void num_processed(const std::vector<std::string>& args, std::ostream& os)
00375    {
00376       if (simple_command_help("num_processed", args, os))
00377       {
00378          os << "processed messages: " << m_client->get_num_processed() << "\n";
00379       }
00380    }
00381 
00382    void num_pending(const std::vector<std::string>& args, std::ostream& os)
00383    {
00384       if (simple_command_help("num_pending", args, os))
00385       {
00386          os << "pending messages: " << m_client->get_num_pending() << "\n";
00387       }
00388    }
00389 
00390    bool run_command(cmd_method_t meth, std::string& rv, const std::vector<std::string>& av, std::ostream& os)
00391    {
00392       try
00393       {
00394          (this->*meth)(av, os);
00395          return true;
00396       }
00397       catch (const std::exception& e)
00398       {
00399          rv = std::string("error: ") + e.what() + "\n";
00400       }
00401       catch (...)
00402       {
00403          rv = "unknown error during command execution\n";
00404       }
00405 
00406       return false;
00407    }
00408 
00409    void process()
00410    {
00411       moost::logging::global_singleton::instance().enable(boost::filesystem::path(m_logging_config), true);
00412       moost::shell<stomp_test_client> shell(*this, m_default_log_level);
00413       reset_client();
00414       shell.run();
00415       m_client.reset();
00416    }
00417 
00418    void reset_client()
00419    {
00420       m_client.reset(new moost::mq::stomp_client(m_consumer_pool_size,
00421                                                  boost::posix_time::milliseconds(1000*m_keepalive_interval),
00422                                                  boost::posix_time::milliseconds(1000*m_reconnect_interval)));
00423    }
00424 };
00425 
00426 int main(int argc, char **argv)
00427 {
00428    int retval = -1;
00429 
00430    try
00431    {
00432       retval = stomp_test_client().run(argc, argv);
00433    }
00434    catch(std::exception const & e)
00435    {
00436       std::cerr << "ERROR: " << e.what() << std::endl;
00437    }
00438    catch(...)
00439    {
00440       std::cerr << "ERROR: unknown error" << std::endl;
00441    }
00442 
00443    return retval;
00444 }