libmoost
/home/mhx/git/github/libmoost/include/moost/psql/pgq.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_PSQL_PGQ_CONSUMER_HPP__
00029 #define MOOST_PSQL_PGQ_CONSUMER_HPP__
00030 
00031 #include <string>
00032 #include <sstream>
00033 #include <cstring>
00034 
00035 #include <pqxx/pqxx>
00036 
00037 namespace moost {
00038 namespace psql {
00039 
00041 class pgq_consumer
00042 {
00043 public:
00049     pgq_consumer(std::string const &dbconn,
00050                  std::string const &queue_name,
00051                  std::string const &consumer_name)
00052         : m_dbconn(dbconn)
00053         , m_queue_name(queue_name)
00054         , m_consumer_name(consumer_name)
00055     {
00056     }
00057 
00067     bool register_consumer() const
00068     {
00069         pqxx::connection conn(m_dbconn);
00070         std::stringstream query;
00071         query << "select pgq.register_consumer("
00072               << conn.quote(m_queue_name) << ','
00073               << conn.quote(m_consumer_name) << ");";
00074 
00075         pqxx::work transaction(conn, "RegisterPgqConsumer");
00076         pqxx::result res = transaction.exec(query);
00077         // If no exception has been thrown, the consumer is registered now.
00078 
00080         bool const return_value = res.size() == 1 && res[0][0].as(0);
00081 
00082         transaction.commit();
00083 
00084         return return_value;
00085     }
00086 
00097     bool unregister_consumer() const
00098     {
00099         pqxx::connection conn(m_dbconn);
00100 
00102         bool return_value = false;
00103 
00104         try
00105         {
00106             std::stringstream query;
00107             query << "select pgq.unregister_consumer("
00108                   << conn.quote(m_queue_name) << ','
00109                   << conn.quote(m_consumer_name) << ");";
00110 
00111             pqxx::work transaction(conn, "UnregisterPgqConsumer");
00112             pqxx::result res = transaction.exec(query);
00113 
00114             return_value = res.size() == 1 && res[0][0].as(0);
00115 
00116             transaction.commit();
00117         }
00118         catch (pqxx::sql_error const & e)
00119         {
00120             if (!std::strstr(e.what(), "consumer not registered on queue"))
00121             {
00122                 throw;
00123             }
00124         }
00125 
00126         return return_value;
00127     }
00128 
00137     bool is_registered() const
00138     {
00139         pqxx::connection conn(m_dbconn);
00140         std::stringstream query;
00141         query << "select 1 as ok from pgq.get_consumer_info() where queue_name="
00142               << conn.quote(m_queue_name) << " and consumer_name="
00143               << conn.quote(m_consumer_name) << ';';
00144 
00145         pqxx::work transaction(conn, "CheckPgqConsumer");
00146         pqxx::result res = transaction.exec(query);
00147 
00148         return res.size() == 1 && res[0][0].as(0);
00149     }
00150 
00160     template<class FunctorType>
00161     void poll(char const * columns, FunctorType const & functor) const
00162     {
00163         pqxx::connection conn(m_dbconn);
00164 
00165         for(;; )
00166         {
00167             std::stringstream query;
00168             query << "select next_batch from pgq.next_batch("
00169                   << conn.quote(m_queue_name) << ','
00170                   << conn.quote(m_consumer_name) << ");";
00171 
00172             pqxx::work transaction1(conn, "PollPgq1");
00173             pqxx::result res = transaction1.exec(query);
00174 
00175             if (res.empty() || res.front()[0].is_null())
00176             {
00177                 transaction1.commit();
00178                 return;
00179             }
00180 
00181             long next_batch;
00182             res.front()[0].to(next_batch);
00183             transaction1.commit();
00184 
00185             query.str(std::string());
00186             query << "select " << columns << " from pgq.get_batch_events(" << next_batch << ");";
00187 
00188             pqxx::work transaction2(conn, "PollPgq2");
00189 
00190             functor(transaction2.exec(query));
00191 
00192             query.str(std::string());
00193             query << "select pgq.finish_batch(" << next_batch << ");";
00194             transaction2.exec(query);
00195             transaction2.commit();
00196         }
00197     }
00198 
00203     std::string const & dbconn() const
00204     {
00205         return m_dbconn;
00206     }
00207 
00212     std::string const & queue_name() const
00213     {
00214         return m_queue_name;
00215     }
00216 
00221     std::string const & consumer_name() const
00222     {
00223         return m_consumer_name;
00224     }
00225 
00226 private:
00227     std::string const m_dbconn;
00228     std::string const m_queue_name;
00229     std::string const m_consumer_name;
00230 };
00231 
00232 } // namespace
00233 } // namespace
00234 
00235 #endif // ifndef MOOST_PSQL_PGQ_CONSUMER_HPP__