libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #include <iostream> 00029 00030 #include <boost/program_options.hpp> 00031 #include <boost/tokenizer.hpp> 00032 #include <boost/bind.hpp> 00033 00034 #include "../../../include/moost/shell.hpp" 00035 #include "../../../include/moost/logging/global.hpp" 00036 #include "../../../include/moost/mq/stomp_client.h" 00037 #include "../../../include/moost/version.h" 00038 00039 namespace po = boost::program_options; 00040 00041 namespace { 00042 00043 template <class I> 00044 void parse_arguments(const std::string& str, I iter) 00045 { 00046 boost::tokenizer< boost::escaped_list_separator<char> > 00047 tok(str, boost::escaped_list_separator<char>("\\", " \t", "'\"")); 00048 std::copy(tok.begin(), tok.end(), iter); 00049 } 00050 00051 } 00052 00053 class stomp_test_client 00054 { 00055 public: 00056 stomp_test_client() 00057 : m_consumer_pool_size(1) 00058 , m_keepalive_interval(30.0) 00059 , m_reconnect_interval(1.0) 00060 , m_default_log_level("info") 00061 { 00062 } 00063 00064 int run(int argc, char * argv[]) 00065 { 00066 if (init(argc, argv)) 00067 { 00068 process(); 00069 } 00070 00071 return 0; 00072 } 00073 00074 std::string show_help() const 00075 { 00076 return 00077 "- connect connect to queue\r\n" 00078 "- disconnect disconnect from queue\r\n" 00079 "- subscribe subscribe to topic\r\n" 00080 "- unsubscribe unsubscribe from topic\r\n" 00081 "- status connection status\r\n" 00082 "- num_processed get number of processed messages\r\n" 00083 "- num_pending get number of pending messages\r\n" 00084 "- send send a message\r\n" 00085 "- reset reset client object\r\n" 00086 "-------------------------------------------------------\r\n" 00087 " <cmd> --help will show help for each command\r\n" 00088 ; 00089 } 00090 00091 std::string get_prompt() const 00092 { 00093 std::ostringstream oss; 00094 oss << "stomp> "; 00095 return oss.str(); 00096 } 00097 00098 bool handle_command(std::string& rv, const std::string& cmd, const std::string& args) 00099 { 00100 cmd_method_t meth; 00101 00102 if (cmd == "connect") { meth = &stomp_test_client::connect; } 00103 else if (cmd == "disconnect") { meth = &stomp_test_client::disconnect; } 00104 else if (cmd == "subscribe") { meth = &stomp_test_client::subscribe; } 00105 else if (cmd == "unsubscribe") { meth = &stomp_test_client::unsubscribe; } 00106 else if (cmd == "status") { meth = &stomp_test_client::status; } 00107 else if (cmd == "num_processed") { meth = &stomp_test_client::num_processed; } 00108 else if (cmd == "num_pending") { meth = &stomp_test_client::num_pending; } 00109 else if (cmd == "send") { meth = &stomp_test_client::send; } 00110 else if (cmd == "reset") { meth = &stomp_test_client::reset; } 00111 else 00112 { 00113 return false; 00114 } 00115 00116 std::vector<std::string> av; 00117 av.push_back(cmd); 00118 00119 try 00120 { 00121 parse_arguments(args, std::back_inserter(av)); 00122 } 00123 catch (const std::exception& e) 00124 { 00125 rv = std::string("parse error: ") + e.what() + "\n"; 00126 return true; 00127 } 00128 00129 std::ostringstream os; 00130 00131 if (run_command(meth, rv, av, os)) 00132 { 00133 rv = os.str(); 00134 } 00135 00136 return true; 00137 } 00138 00139 private: 00140 typedef void (stomp_test_client::*cmd_method_t)(const std::vector<std::string>&, std::ostream&); 00141 00142 size_t m_consumer_pool_size; 00143 float m_keepalive_interval; 00144 float m_reconnect_interval; 00145 std::string m_default_log_level; 00146 std::string m_logging_config; 00147 00148 boost::shared_ptr<moost::mq::stomp_client> m_client; 00149 00150 void show_help(po::options_description& opt) const 00151 { 00152 std::cout 00153 << "mq-stomp-test-client (" << LIBMOOST_REVISION_STR << ")\n" 00154 << "Build: " << __DATE__ << " (" << __TIME__ << ") " << LIBMOOST_COPYRIGHT_STR << "\n\n" 00155 << opt << std::endl; 00156 00157 exit(0); 00158 } 00159 00160 bool init(int argc, char * argv[]) 00161 { 00162 po::options_description cmdline_options("Command line options"); 00163 cmdline_options.add_options() 00164 ("consumer-pool-size", po::value<size_t>(&m_consumer_pool_size)->default_value(1), "consumer pool size") 00165 ("keepalive-interval", po::value<float>(&m_keepalive_interval)->default_value(30.0), "keepalive interval") 00166 ("reconnect-interval", po::value<float>(&m_reconnect_interval)->default_value(1.0), "reconnect interval") 00167 ("log-level,l", po::value<std::string>(&m_default_log_level)->default_value("info"), "default log level") 00168 ("logging-config", po::value<std::string>(&m_logging_config), "logging configuration file") 00169 ("help,h", "output help message and exit") 00170 ; 00171 00172 po::variables_map vm; 00173 00174 po::store(po::parse_command_line(argc, argv, cmdline_options), vm); 00175 po::notify(vm); 00176 00177 if (vm.count("help")) 00178 { 00179 show_help(cmdline_options); 00180 return false; 00181 } 00182 00183 return true; 00184 } 00185 00186 void parse_options(const std::vector<std::string>& args, const po::options_description& od, const po::positional_options_description& pd, po::variables_map& vm) 00187 { 00188 std::vector<char *> argv; 00189 for (std::vector<std::string>::const_iterator it = args.begin(); it != args.end(); ++it) 00190 { 00191 argv.push_back(const_cast<char *>(it->c_str())); 00192 } 00193 // po::store(po::parse_command_line(argv.size(), &argv[0], od), vm); 00194 po::store(po::command_line_parser(argv.size(), &argv[0]).options(od).positional(pd).run(), vm); 00195 po::notify(vm); 00196 } 00197 00198 void error_callback(const boost::system::error_code& ec, const std::string& str) 00199 { 00200 std::cerr << "queue error: " << ec.message() << " (" << str << ")" << std::endl; 00201 } 00202 00203 void message_callback(const std::string& topic, const std::string& msg) 00204 { 00205 std::cout << "[" << topic << "] " << msg << std::endl; 00206 } 00207 00208 bool simple_command_help(const std::string& name, const std::vector<std::string>& args, std::ostream& os) 00209 { 00210 po::options_description opt(name + " options"); 00211 opt.add_options() 00212 ("help,h", "output help message") 00213 ; 00214 00215 po::positional_options_description pos; 00216 00217 po::variables_map vm; 00218 00219 parse_options(args, opt, pos, vm); 00220 00221 if (vm.count("help")) 00222 { 00223 os << opt << std::endl; 00224 return false; 00225 } 00226 00227 return true; 00228 } 00229 00230 template <typename Func> 00231 void simple_command(const std::string& name, Func func, const std::vector<std::string>& args, std::ostream& os) 00232 { 00233 if (simple_command_help(name, args, os)) 00234 { 00235 func(); 00236 } 00237 } 00238 00239 void connect(const std::vector<std::string>& args, std::ostream& os) 00240 { 00241 std::string server; 00242 int port; 00243 00244 po::options_description opt("connect options"); 00245 opt.add_options() 00246 ("server,s", po::value<std::string>(&server), "server name") 00247 ("port,p", po::value<int>(&port)->default_value(61613), "port") 00248 ("help,h", "output help message") 00249 ; 00250 00251 po::positional_options_description pos; 00252 pos.add("server", 1).add("port", 1); 00253 00254 po::variables_map vm; 00255 00256 parse_options(args, opt, pos, vm); 00257 00258 if (vm.count("help") || !vm.count("server") || !vm.count("port")) 00259 { 00260 os << opt << std::endl; 00261 return; 00262 } 00263 00264 m_client->connect(server, port, boost::bind(&stomp_test_client::error_callback, this, _1, _2)); 00265 } 00266 00267 void subscribe(const std::vector<std::string>& args, std::ostream& os) 00268 { 00269 std::string topic; 00270 bool ack = false; 00271 float max_msg_interval = -1.0f; 00272 00273 po::options_description opt("subscribe options"); 00274 opt.add_options() 00275 ("topic,t", po::value<std::string>(&topic), "topic") 00276 ("ack,a", po::value<bool>(&ack)->zero_tokens(), "acknowledge") 00277 ("max-msg-interval,m", po::value<float>(&max_msg_interval), "max message interval") 00278 ("help,h", "output help message") 00279 ; 00280 00281 po::positional_options_description pos; 00282 pos.add("topic", 1); 00283 00284 po::variables_map vm; 00285 00286 parse_options(args, opt, pos, vm); 00287 00288 if (vm.count("help") || !vm.count("topic")) 00289 { 00290 os << opt << std::endl; 00291 return; 00292 } 00293 00294 m_client->subscribe(topic, boost::bind(&stomp_test_client::message_callback, this, topic, _1), 00295 ack ? moost::mq::stomp_client::ack::automatic : moost::mq::stomp_client::ack::client, 00296 max_msg_interval < 0.0f ? boost::posix_time::time_duration(boost::posix_time::pos_infin) 00297 : boost::posix_time::milliseconds(1000*max_msg_interval)); 00298 } 00299 00300 void unsubscribe(const std::vector<std::string>& args, std::ostream& os) 00301 { 00302 std::string topic; 00303 00304 po::options_description opt("unsubscribe options"); 00305 opt.add_options() 00306 ("topic,t", po::value<std::string>(&topic), "topic") 00307 ("help,h", "output help message") 00308 ; 00309 00310 po::positional_options_description pos; 00311 pos.add("topic", 1); 00312 00313 po::variables_map vm; 00314 00315 parse_options(args, opt, pos, vm); 00316 00317 if (vm.count("help") || !vm.count("topic")) 00318 { 00319 os << opt << std::endl; 00320 return; 00321 } 00322 00323 m_client->unsubscribe(topic); 00324 } 00325 00326 void send(const std::vector<std::string>& args, std::ostream& os) 00327 { 00328 std::string topic; 00329 std::string message; 00330 00331 po::options_description opt("send options"); 00332 opt.add_options() 00333 ("topic,t", po::value<std::string>(&topic), "topic") 00334 ("message,m", po::value<std::string>(&message), "message") 00335 ("help,h", "output help message") 00336 ; 00337 00338 po::positional_options_description pos; 00339 pos.add("topic", 1); 00340 pos.add("message", 2); 00341 00342 po::variables_map vm; 00343 00344 parse_options(args, opt, pos, vm); 00345 00346 if (vm.count("help") || !vm.count("topic") || !vm.count("message")) 00347 { 00348 os << opt << std::endl; 00349 return; 00350 } 00351 00352 m_client->send(topic, message); 00353 } 00354 00355 void disconnect(const std::vector<std::string>& args, std::ostream& os) 00356 { 00357 simple_command("disconnect", boost::bind(&moost::mq::stomp_client::disconnect, m_client), args, os); 00358 } 00359 00360 void reset(const std::vector<std::string>& args, std::ostream& os) 00361 { 00362 simple_command("reset", boost::bind(&stomp_test_client::reset_client, this), args, os); 00363 } 00364 00365 void status(const std::vector<std::string>& args, std::ostream& os) 00366 { 00367 if (simple_command_help("status", args, os)) 00368 { 00369 os << (m_client->is_connected() ? "connected" : "disconnected") << ", " 00370 << (m_client->is_online() ? "online" : "offline") << "\n"; 00371 } 00372 } 00373 00374 void num_processed(const std::vector<std::string>& args, std::ostream& os) 00375 { 00376 if (simple_command_help("num_processed", args, os)) 00377 { 00378 os << "processed messages: " << m_client->get_num_processed() << "\n"; 00379 } 00380 } 00381 00382 void num_pending(const std::vector<std::string>& args, std::ostream& os) 00383 { 00384 if (simple_command_help("num_pending", args, os)) 00385 { 00386 os << "pending messages: " << m_client->get_num_pending() << "\n"; 00387 } 00388 } 00389 00390 bool run_command(cmd_method_t meth, std::string& rv, const std::vector<std::string>& av, std::ostream& os) 00391 { 00392 try 00393 { 00394 (this->*meth)(av, os); 00395 return true; 00396 } 00397 catch (const std::exception& e) 00398 { 00399 rv = std::string("error: ") + e.what() + "\n"; 00400 } 00401 catch (...) 00402 { 00403 rv = "unknown error during command execution\n"; 00404 } 00405 00406 return false; 00407 } 00408 00409 void process() 00410 { 00411 moost::logging::global_singleton::instance().enable(boost::filesystem::path(m_logging_config), true); 00412 moost::shell<stomp_test_client> shell(*this, m_default_log_level); 00413 reset_client(); 00414 shell.run(); 00415 m_client.reset(); 00416 } 00417 00418 void reset_client() 00419 { 00420 m_client.reset(new moost::mq::stomp_client(m_consumer_pool_size, 00421 boost::posix_time::milliseconds(1000*m_keepalive_interval), 00422 boost::posix_time::milliseconds(1000*m_reconnect_interval))); 00423 } 00424 }; 00425 00426 int main(int argc, char **argv) 00427 { 00428 int retval = -1; 00429 00430 try 00431 { 00432 retval = stomp_test_client().run(argc, argv); 00433 } 00434 catch(std::exception const & e) 00435 { 00436 std::cerr << "ERROR: " << e.what() << std::endl; 00437 } 00438 catch(...) 00439 { 00440 std::cerr << "ERROR: unknown error" << std::endl; 00441 } 00442 00443 return retval; 00444 }