libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_PSQL_PGQ_CONSUMER_HPP__ 00029 #define MOOST_PSQL_PGQ_CONSUMER_HPP__ 00030 00031 #include <string> 00032 #include <sstream> 00033 #include <cstring> 00034 00035 #include <pqxx/pqxx> 00036 00037 namespace moost { 00038 namespace psql { 00039 00041 class pgq_consumer 00042 { 00043 public: 00049 pgq_consumer(std::string const &dbconn, 00050 std::string const &queue_name, 00051 std::string const &consumer_name) 00052 : m_dbconn(dbconn) 00053 , m_queue_name(queue_name) 00054 , m_consumer_name(consumer_name) 00055 { 00056 } 00057 00067 bool register_consumer() const 00068 { 00069 pqxx::connection conn(m_dbconn); 00070 std::stringstream query; 00071 query << "select pgq.register_consumer(" 00072 << conn.quote(m_queue_name) << ',' 00073 << conn.quote(m_consumer_name) << ");"; 00074 00075 pqxx::work transaction(conn, "RegisterPgqConsumer"); 00076 pqxx::result res = transaction.exec(query); 00077 // If no exception has been thrown, the consumer is registered now. 00078 00080 bool const return_value = res.size() == 1 && res[0][0].as(0); 00081 00082 transaction.commit(); 00083 00084 return return_value; 00085 } 00086 00097 bool unregister_consumer() const 00098 { 00099 pqxx::connection conn(m_dbconn); 00100 00102 bool return_value = false; 00103 00104 try 00105 { 00106 std::stringstream query; 00107 query << "select pgq.unregister_consumer(" 00108 << conn.quote(m_queue_name) << ',' 00109 << conn.quote(m_consumer_name) << ");"; 00110 00111 pqxx::work transaction(conn, "UnregisterPgqConsumer"); 00112 pqxx::result res = transaction.exec(query); 00113 00114 return_value = res.size() == 1 && res[0][0].as(0); 00115 00116 transaction.commit(); 00117 } 00118 catch (pqxx::sql_error const & e) 00119 { 00120 if (!std::strstr(e.what(), "consumer not registered on queue")) 00121 { 00122 throw; 00123 } 00124 } 00125 00126 return return_value; 00127 } 00128 00137 bool is_registered() const 00138 { 00139 pqxx::connection conn(m_dbconn); 00140 std::stringstream query; 00141 query << "select 1 as ok from pgq.get_consumer_info() where queue_name=" 00142 << conn.quote(m_queue_name) << " and consumer_name=" 00143 << conn.quote(m_consumer_name) << ';'; 00144 00145 pqxx::work transaction(conn, "CheckPgqConsumer"); 00146 pqxx::result res = transaction.exec(query); 00147 00148 return res.size() == 1 && res[0][0].as(0); 00149 } 00150 00160 template<class FunctorType> 00161 void poll(char const * columns, FunctorType const & functor) const 00162 { 00163 pqxx::connection conn(m_dbconn); 00164 00165 for(;; ) 00166 { 00167 std::stringstream query; 00168 query << "select next_batch from pgq.next_batch(" 00169 << conn.quote(m_queue_name) << ',' 00170 << conn.quote(m_consumer_name) << ");"; 00171 00172 pqxx::work transaction1(conn, "PollPgq1"); 00173 pqxx::result res = transaction1.exec(query); 00174 00175 if (res.empty() || res.front()[0].is_null()) 00176 { 00177 transaction1.commit(); 00178 return; 00179 } 00180 00181 long next_batch; 00182 res.front()[0].to(next_batch); 00183 transaction1.commit(); 00184 00185 query.str(std::string()); 00186 query << "select " << columns << " from pgq.get_batch_events(" << next_batch << ");"; 00187 00188 pqxx::work transaction2(conn, "PollPgq2"); 00189 00190 functor(transaction2.exec(query)); 00191 00192 query.str(std::string()); 00193 query << "select pgq.finish_batch(" << next_batch << ");"; 00194 transaction2.exec(query); 00195 transaction2.commit(); 00196 } 00197 } 00198 00203 std::string const & dbconn() const 00204 { 00205 return m_dbconn; 00206 } 00207 00212 std::string const & queue_name() const 00213 { 00214 return m_queue_name; 00215 } 00216 00221 std::string const & consumer_name() const 00222 { 00223 return m_consumer_name; 00224 } 00225 00226 private: 00227 std::string const m_dbconn; 00228 std::string const m_queue_name; 00229 std::string const m_consumer_name; 00230 }; 00231 00232 } // namespace 00233 } // namespace 00234 00235 #endif // ifndef MOOST_PSQL_PGQ_CONSUMER_HPP__