libmoost
/home/mhx/git/github/libmoost/include/moost/kvds/kvds_page_store.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_KVDS_KVDS_PAGESTORE_HPP__
00029 #define MOOST_KVDS_KVDS_PAGESTORE_HPP__
00030 
00035 
00036 #include <vector>
00037 #include <sstream>
00038 #include <fstream>
00039 #include <cassert>
00040 #include <limits>
00041 #include <bitset>
00042 #include <algorithm>
00043 #include <utility>
00044 #include <csignal>
00045 
00046 #include <boost/cstdint.hpp>
00047 #include <boost/filesystem.hpp>
00048 #include <boost/shared_ptr.hpp>
00049 #include <boost/cast.hpp>
00050 
00051 #include <boost/archive/binary_oarchive.hpp>
00052 #include <boost/archive/binary_iarchive.hpp>
00053 #include <boost/serialization/utility.hpp>
00054 
00055 #include "../utils/bits.hpp"
00056 #include "../algorithm/fast_hash.hpp"
00057 #include "../container/sparse_hash_map.hpp"
00058 #include "../container/sparse_hash_set.hpp"
00059 
00060 #include "../serialization/hashmap_serializer.hpp"
00061 #include "../serialization/hashset_serializer.hpp"
00062 
00063 #include "ikvds.hpp"
00064 
00066 #ifndef NDEBUG
00067 #define KVDS_PAGESTORE_FLUSH_STREAM__(S) S.flush()
00068 #else
00069 #define KVDS_PAGESTORE_FLUSH_STREAM__(S)
00070 #endif
00071 
00075 
00076 namespace moost { namespace kvds {
00077 
00078    typedef std::vector<char> byte_array_t;
00079 
00084 
00087 
00088    template <typename pagemapT>
00089    class KvdsPageMapShared
00090    {
00091    public:
00092       typedef pagemapT pagemap_t;
00093       typedef typename pagemap_t::size_type size_type;
00094       typedef typename pagemap_t::iterator iterator;
00095       typedef typename pagemap_t::const_iterator const_iterator;
00096       typedef typename pagemap_t::pageinfo_t pageinfo_t;
00097       typedef typename pagemap_t::itemid_t itemid_t;
00098       typedef typename pagemap_t::storage_t storage_t;
00099       typedef KvdsPageMapShared<pagemapT> this_type;
00100 
00101    private:
00102 
00108       struct metadata
00109       {
00110          typedef KvdsPageMapShared<pagemapT> shared_pagemap_t;
00111          typedef typename shared_pagemap_t::pagemap_t pagemap_t;
00112          typedef std::vector<shared_pagemap_t *> shared_pagemaps_t;
00113 
00114          metadata(shared_pagemap_t * pshared) :
00115             isdirty(0), ppagemap(new pagemap_t), pshared_pagemaps(new shared_pagemaps_t)
00116          {
00117             typename shared_pagemaps_t::iterator itr = std::find(pshared_pagemaps->begin(), pshared_pagemaps->end(), pshared);
00118 
00123 
00124             assert(itr == pshared_pagemaps->end());
00125 
00129             if(itr == pshared_pagemaps->end())
00130             {
00131                pshared_pagemaps->push_back(pshared);
00132             }
00133          }
00134 
00135          bool is_owner(shared_pagemap_t * pshared)
00136          {
00137             // the first item in the list is always considered the owner. If the owner
00138             // is removed (when it is destroyed) the next item will become the owner.
00139             return !pshared_pagemaps->empty() && *(pshared_pagemaps->begin()) == pshared;
00140          }
00141 
00142 
00143 
00145          void remove(shared_pagemap_t * pshared)
00146          {
00147             std::remove(pshared_pagemaps->begin(), pshared_pagemaps->end(), pshared);
00148          }
00149 
00151          volatile std::sig_atomic_t isdirty;
00152 
00155          boost::shared_ptr<pagemap_t> ppagemap;
00156 
00159          boost::shared_ptr<shared_pagemaps_t>  pshared_pagemaps;
00160       };
00161 
00162       boost::shared_ptr<metadata> pmetadata_;
00163 
00164    public:
00165 
00166       ~KvdsPageMapShared()
00167       {
00168          if(pmetadata_)
00169          {
00170             pmetadata_->remove(this);
00171          }
00172       }
00173 
00175       void get_metadata_from(this_type const & rhs)
00176       {
00177          // make sure there is something to share
00178          rhs.validate_metadata_exists();
00179 
00180          // it's good to share
00181          pmetadata_ = rhs.pmetadata_;
00182       }
00183 
00185       void give_metadata_to(this_type const & rhs)
00186       {
00187          // make sure there is something to share
00188          validate_metadata_exists();
00189 
00190          // it's good to share
00191          rhs.pmetadata_ = pmetadata_;
00192       }
00193 
00194       storage_t & get_storage()
00195       {
00196          // use responsibly, remember using this function is NOT thread safe!
00197          create_metadata_ondemand();
00198          return pmetadata_->ppagemap->get_storage();
00199       }
00200 
00204 
00205       iterator begin()
00206       {
00207          create_metadata_ondemand();
00208 
00209          // since the iterator is non-const we have to assume the store is now dirty
00210          pmetadata_->isdirty = 1;
00211          return pmetadata_->ppagemap->begin();
00212       }
00213 
00214       iterator end()
00215       {
00216          create_metadata_ondemand();
00217          return pmetadata_->ppagemap->end();
00218       }
00219 
00220       size_type size() const
00221       {
00222          return pmetadata_ ? pmetadata_->ppagemap->size() : 0;
00223       }
00224 
00225       void resize(size_type const size)
00226       {
00227          create_metadata_ondemand();
00228          pmetadata_->ppagemap->resize(size);
00229       }
00230 
00231       bool empty() const
00232       {
00233          return pmetadata_ ? pmetadata_->ppagemap->empty() : true;
00234       }
00235 
00236       void clear()
00237       {
00238          if(pmetadata_)
00239          {
00240             pmetadata_->isdirty = 1;
00241             pmetadata_->ppagemap->clear();
00242          }
00243       }
00244 
00245       iterator find (
00246          void const * pkey, size_t const ksize
00247          )
00248       {
00249          create_metadata_ondemand();
00250          iterator itr = pmetadata_->ppagemap->find(pkey, ksize);
00251 
00252          if(itr != end())
00253          {
00254             // since the iterator is non-const we have to assume the store is now dirty
00255             pmetadata_->isdirty = 1;
00256          }
00257 
00258          return itr;
00259       }
00260 
00261       void erase(iterator itr)
00262       {
00263          if(pmetadata_)
00264          {
00265             pmetadata_->isdirty = 1;
00266             pmetadata_->ppagemap->erase(itr);
00267          }
00268       }
00269 
00270       iterator insert(
00271          void const * pkey, size_t const ksize,
00272          pageinfo_t const & page_info
00273          )
00274       {
00275          create_metadata_ondemand();
00276          pmetadata_->isdirty = 1;
00277          return pmetadata_->ppagemap->insert(pkey, ksize, page_info);
00278       }
00279 
00280       bool itr2key(
00281          const_iterator itr,
00282          void * pkey, size_t & ksize
00283          ) const
00284       {
00285          return pmetadata_ ? pmetadata_->ppagemap->itr2key(itr, pkey, ksize) : false;
00286       }
00287 
00288       void load(std::string const & fname, bool newdb)
00289       {
00290          // if I'm the owner I'll load the data
00291          if(create_metadata_ondemand() && pmetadata_->is_owner(this))
00292          {
00293             pmetadata_->isdirty = !boost::filesystem::exists(fname) || newdb ? 1 : 0;
00294             pmetadata_->ppagemap->load(fname, newdb);
00295          }
00296       }
00297 
00298       void save(std::string const & fname)
00299       {
00300          // if there is metadata and we're the store ownder and the store is dirty then save.
00301          if(pmetadata_ && pmetadata_->is_owner(this) && pmetadata_->isdirty)
00302          {
00303             pmetadata_->ppagemap->save(fname);
00304          }
00305       }
00306 
00307    private:
00308       void validate_metadata_exists() const
00309       {
00310          if(!pmetadata_) { throw std::runtime_error("shared pagemap metadata not found"); }
00311       }
00312 
00317       bool create_metadata_ondemand()
00318       {
00319          if(!pmetadata_)
00320          {
00321             pmetadata_.reset(new metadata(this));
00322             return true;
00323          }
00324 
00325          return false;
00326       }
00327    };
00328 
00330 
00332 
00333    template <typename keyT>
00334    class KvdsPageMapIntrinsicKey
00335    {
00336    public:
00337       typedef boost::uint8_t storeid_t;
00338       typedef boost::uint32_t itemid_t; // 4 billion items per store should be enough for anyone.
00339       typedef std::pair<storeid_t, itemid_t> pageinfo_t;
00340       typedef keyT key_type;
00341       typedef moost::container::sparse_hash_map<key_type, pageinfo_t> storage_t;
00342       typedef typename storage_t::size_type size_type;
00343       typedef typename storage_t::iterator iterator;
00344       typedef typename storage_t::const_iterator const_iterator;
00345 
00346    public:
00347       KvdsPageMapIntrinsicKey()
00348       {
00349          // Since we support deletions there must always be a deleted key.
00350          // By default it'll be the max value that can be represented by
00351          // the key type. This can be changed by making a call to "get_storage"
00352          // to get a handle to the underlying storage and then setting a new value.
00353          storage_.set_deleted_key(std::numeric_limits<key_type>::max());
00354 
00355          // set an arbitrary starting size for the sparse hashmap. Failure to set a
00356          // reasonable size won't cause a fail but it will cause a performance hit.
00357          // Since sparse hashmap is very good in terms of memory usage we can affort
00358          // to push the boat out a little and set a reasonable default starting size.
00359          storage_.resize(0xFFFFFF);
00360       }
00361 
00364       // and yet different stores may have different requirements. For example the
00366       storage_t & get_storage()
00367       {
00368          return storage_;
00369       }
00370 
00374 
00375       iterator begin() { return storage_.begin(); }
00376 
00377       iterator end() { return storage_.end(); }
00378 
00379       size_type size() const { return storage_.size(); }
00380 
00381       void resize(size_type const size) { storage_.resize(size); }
00382 
00383       bool empty() const { return storage_.empty(); }
00384 
00385       void clear() { storage_.clear(); }
00386 
00387       iterator find (
00388          void const * pkey, size_t const /*ksize*/
00389          )
00390       {
00391          key_type key = *((key_type*)pkey);
00392          return storage_.find(key);
00393       }
00394 
00395       void erase(iterator itr)
00396       {
00397          assert(itr != storage_.end());
00398          storage_.erase(itr);
00399       }
00400 
00401       iterator insert(
00402          void const * pkey, size_t const /*ksize*/,
00403          pageinfo_t const & page_info
00404          )
00405       {
00406          key_type key = *((key_type*)pkey);
00407          return storage_.insert(std::make_pair(key, page_info)).first;
00408       }
00409 
00411       bool itr2key(
00412          const_iterator itr,
00413          void * pkey, size_t & ksize
00414          ) const
00415       {
00416          bool ok = false;
00417          size_t const size = sizeof(itr->first);
00418 
00419          if(size <= ksize)
00420          {
00421             memcpy(pkey, &itr->first, size);
00422             ok = true;
00423          }
00424 
00425          ksize = size;
00426 
00427          return ok;
00428       }
00429 
00430       void load(std::string const & fname, bool newdb)
00431       {
00432          if(newdb)
00433          {
00434             boost::filesystem::remove(fname);
00435 
00436          }
00437          else
00438          // only try to load it if it exists
00439          if(boost::filesystem::exists(fname))
00440          {
00441             // archive takes care of any io problems for us
00442             std::ifstream in(fname.c_str(), std::ios::binary);
00443             boost::archive::binary_iarchive ar(in);
00444             ar >> storage_;
00445          }
00446       }
00447 
00448       void save(std::string const & fname)
00449       {
00450          // archive takes care of any io problems for us
00451          std::ofstream out(fname.c_str(), std::ios::binary);
00452          boost::archive::binary_oarchive ar(out);
00453          ar << storage_;
00454       }
00455 
00456    private:
00457       storage_t storage_;
00458    };
00459 
00460 
00462    struct KvdsPageMapDefaultKeyHashFunctor : public std::unary_function< byte_array_t, size_t >
00463    {
00464       size_t operator()(const byte_array_t & key) const
00465       {
00466          return moost::algorithm::fast_hash(&key[0], key.size() * sizeof(byte_array_t::value_type));
00467       }
00468    };
00469 
00475 
00477 
00478    template <typename KeyHashFunctorT = KvdsPageMapDefaultKeyHashFunctor>
00479    class KvdsPageMapNonIntrinsicKey
00480    {
00481    public:
00482       typedef boost::uint8_t storeid_t;
00483       typedef boost::uint32_t itemid_t; // 4 billion items per store should be enough for anyone.
00484       typedef std::pair<storeid_t, itemid_t> pageinfo_t;
00485       typedef KeyHashFunctorT KeyHashFunctor;
00486       typedef moost::container::sparse_hash_map<byte_array_t, pageinfo_t, KeyHashFunctor> storage_t;
00487       typedef typename storage_t::key_type key_type;
00488       typedef typename storage_t::size_type size_type;
00489       typedef typename storage_t::iterator iterator;
00490       typedef typename storage_t::const_iterator const_iterator;
00491 
00492    public:
00493       KvdsPageMapNonIntrinsicKey()
00494       {
00495          // Since we support deletions there must always be a deleted key.
00496          // By default it'll be an empty key (remember a key is just a vector
00497          // of bytes). This can be changed by making a call to "get_storage"
00498          // to get a handle to the underlying storage and then setting a new value.
00499          storage_.set_deleted_key(key_type());
00500 
00501          // set an arbitrary starting size for the sparse hashmap. Failure to set a
00502          // reasonable size won't cause a fail but it will cause a performance hit.
00503          // Since sparse hashmap is very good in terms of memory usage we can affort
00504          // to push the boat out a little and set a reasonable default starting size.
00505          storage_.resize(0xFFFFFF);
00506       }
00507 
00512       storage_t & get_storage()
00513       {
00514          return storage_;
00515       }
00516 
00520 
00521       iterator begin() { return storage_.begin(); }
00522 
00523       iterator end() { return storage_.end(); }
00524 
00525       size_type size() const { return storage_.size(); }
00526 
00527       void resize(size_type const size) { storage_.resize(size); }
00528 
00529       bool empty() const { return storage_.empty(); }
00530 
00531       void clear() { storage_.clear(); }
00532 
00533       iterator find (
00534          void const * pkey, size_t const ksize
00535          )
00536       {
00537          key_type key(ksize);
00538          memcpy(&key[0], pkey, ksize);
00539          return storage_.find(key);
00540       }
00541 
00542       void erase(iterator itr)
00543       {
00544          storage_.erase(itr);
00545       }
00546 
00547       iterator insert(
00548          void const * pkey, size_t const ksize,
00549          pageinfo_t const & page_info
00550          )
00551       {
00552          key_type key(ksize);
00553          memcpy(&key[0], pkey, ksize);
00554          return storage_.insert(std::make_pair(key, page_info)).first;
00555       }
00556 
00558       bool itr2key(
00559          const_iterator itr,
00560          void * pkey, size_t & ksize
00561          ) const
00562       {
00563          bool ok = false;
00564          size_t const size = itr->first.size();
00565 
00566          if(size <= ksize)
00567          {
00568             memcpy(pkey, &itr->first[0], size);
00569             ok = true;
00570          }
00571 
00572          ksize = size;
00573 
00574          return ok;
00575       }
00576 
00577       void load(std::string const & fname, bool newdb)
00578       {
00579          if(newdb)
00580          {
00581             boost::filesystem::remove(fname);
00582 
00583          }
00584          else
00585          {
00586             std::ifstream in(fname.c_str(), std::ios::binary);
00587             in.exceptions(std::ios::badbit);
00588 
00589             key_type key;
00590 
00591             // Assume the stream is all ready for reading and exceptions are on
00592             while(in)
00593             {
00594                size_t size = 0;
00595                in.read(reinterpret_cast<char *>(&size), sizeof(size));
00596 
00597                if(in)
00598                {
00599                   key.resize(size);
00600                   in.read(&key[0], size * sizeof(typename storage_t::key_type::value_type));
00601 
00602                   if(in)
00603                   {
00604                      typename storage_t::data_type & val = storage_[key];
00605                      in.read(reinterpret_cast<char *>(&val.first), sizeof(val.first));
00606                      in.read(reinterpret_cast<char *>(&val.second), sizeof(val.second));
00607                   }
00608                }
00609             }
00610          }
00611       }
00612 
00613       void save(std::string const & fname) const
00614       {
00615          std::ofstream out(fname.c_str(), std::ios::binary | std::ios::trunc);
00616          if(!out) { throw std::runtime_error("Unable to open keyval index for writing"); }
00617 
00618          out.exceptions(std::ios::badbit | std::ios::failbit);
00619 
00620          // Assume the stream is all ready for writing and exceptions are on
00621          for(typename storage_t::const_iterator itr = storage_.begin() ; itr != storage_.end() ; ++itr)
00622          {
00624             size_t size = itr->first.size();
00625             out.write(reinterpret_cast<char *>(&size), sizeof(size));
00626             KVDS_PAGESTORE_FLUSH_STREAM__(out);
00627             out.write(&itr->first[0], size * sizeof(typename storage_t::key_type::value_type));
00628             KVDS_PAGESTORE_FLUSH_STREAM__(out);
00629 
00631             out.write(reinterpret_cast<char const *>(&itr->second.first), sizeof(itr->second.first));
00632             KVDS_PAGESTORE_FLUSH_STREAM__(out);
00633             out.write(reinterpret_cast<char const *>(&itr->second.second), sizeof(itr->second.second));
00634             KVDS_PAGESTORE_FLUSH_STREAM__(out);
00635          }
00636       }
00637 
00638    private:
00639       storage_t storage_;
00640    };
00641 
00643 
00644    template <
00645       typename PageMapT // Either intrinsic (built-in integer types such as int or long) or non-instrinsic key.
00646    >
00647    class KvdsPageStore : public IKvds
00648    {
00649    private:
00650       typedef size_t page_size_t;
00651       typedef PageMapT pagemap_t;
00652       typedef boost::uint8_t storeid_t;
00653       typedef typename PageMapT::itemid_t itemid_t;
00654       typedef typename PageMapT::pageinfo_t pageinfo_t;
00655 
00656       class Store
00657       {
00658       public:
00659          Store(page_size_t page_size) :
00660             page_size_(page_size), item_cnt_(0), free_list_(0xFFFF) // freelist size set to arbitrary 64K items (sparse map, low cost)
00661             {
00662                if(0 == page_size)
00663                {
00664                   throw std::invalid_argument("Page size must be greater than zero");
00665                }
00666 
00667                if(!moost::utils::is_power_of_two(page_size))
00668                {
00669                   throw std::invalid_argument("Page size must be a power of two");
00670                }
00671 
00672                // If we ever hit this number (it's huge) we probably have bigger fish to fry!
00673                free_list_.set_deleted_key(std::numeric_limits<itemid_t>::max());
00674             }
00675 
00676             ~Store()
00677             {
00678                if(store_.is_open())
00679                {
00680                   close();
00681                }
00682             }
00683 
00684             page_size_t get_page_size() const { return page_size_; }
00685 
00686             void open(char const dsname [], bool newdb)
00687             {
00688                if(page_size_ <= std::numeric_limits<boost::uint8_t>::max())
00689                {
00690                   openT<boost::uint8_t>(dsname,  newdb);
00691                }
00692                else
00693                if(page_size_ <= std::numeric_limits<boost::uint16_t>::max())
00694                {
00695                   openT<boost::uint16_t>(dsname,  newdb);
00696                }
00697                else
00698                if(page_size_ <= std::numeric_limits<boost::uint32_t>::max())
00699                {
00700                   openT<boost::uint32_t>(dsname,  newdb);
00701                }
00702 #ifndef WIN32 // 32 bit Windows will barf at this :(
00703                else
00704                if(page_size_ <= std::numeric_limits<boost::uint64_t>::max())
00705                {
00706                   openT<boost::uint64_t>(dsname,  newdb);
00707                }
00708 #endif
00709                else
00710                {
00711                   throw std::runtime_error("open failed, value size is unsupported");
00712                }
00713             }
00714 
00715             void save()
00716             {
00717                if(store_.is_open())
00718                {
00719                   // archive takes care of any io problems for us
00720                   std::ofstream out(freelist_fname_.c_str(), std::ios::binary);
00721                   boost::archive::binary_oarchive ar(out);
00722                   ar << free_list_;
00723                }
00724             }
00725 
00726             void close()
00727             {
00728                if(store_.is_open())
00729                {
00730                   store_.close();
00731                }
00732             }
00733 
00734             bool erase(itemid_t itemid)
00735             {
00736                bool found = false;
00737 
00738                if(exists(itemid))
00739                {
00740                   free_list_.insert(itemid);
00741                   --item_cnt_;
00742                   found = true;
00743                }
00744 
00745                return found;
00746             }
00747 
00748             bool exists(itemid_t itemid)
00749             {
00750                return ((itemid < (free_list_.size() + item_cnt_)) && free_list_.find(itemid) == free_list_.end());
00751             }
00752 
00753             size_t size() const
00754             {
00755                return item_cnt_;
00756             }
00757 
00758             bool empty()
00759             {
00760                return 0 == item_cnt_;
00761             }
00762 
00763             void clear()
00764             {
00765                // Clear indexes
00766                item_cnt_ = 0;
00767                free_list_.clear();
00768 
00769                // Close store and re-open as new (thus, truncating the store)
00770                store_.close();
00771                open(dsname_.c_str(), true);
00772             }
00773 
00774             bool size(itemid_t itemid, size_t & size)
00775             {
00776                bool ok = false;
00777 
00778                if(page_size_ <= std::numeric_limits<boost::uint8_t>::max())
00779                {
00780                   ok = sizeT<boost::uint8_t>(itemid, size);
00781                }
00782                else
00783                if(page_size_ <= std::numeric_limits<boost::uint16_t>::max())
00784                {
00785                   ok = sizeT<boost::uint16_t>(itemid, size);
00786                }
00787                else
00788                if(page_size_ <= std::numeric_limits<boost::uint32_t>::max())
00789                {
00790                   ok = sizeT<boost::uint32_t>(itemid, size);
00791                }
00792 #ifndef WIN32 // 32 bit Windows will barf at this :(
00793                else
00794                if(page_size_ <= std::numeric_limits<boost::uint64_t>::max())
00795                {
00796                   ok = sizeT<boost::uint64_t>(itemid, size);
00797                }
00798 #endif
00799                else
00800                {
00801                   throw std::runtime_error("size failed, value size is unsupported");
00802                }
00803 
00804                return ok;
00805             }
00806 
00807             bool read(void * data, page_size_t size, itemid_t itemid)
00808             {
00809                bool ok = false;
00810 
00811                if(page_size_ <= std::numeric_limits<boost::uint8_t>::max())
00812                {
00813                   ok = readT<boost::uint8_t>(data, size, itemid);
00814                }
00815                else
00816                if(page_size_ <= std::numeric_limits<boost::uint16_t>::max())
00817                {
00818                   ok = readT<boost::uint16_t>(data, size, itemid);
00819                }
00820                else
00821                if(page_size_ <= std::numeric_limits<boost::uint32_t>::max())
00822                {
00823                   ok = readT<boost::uint32_t>(data, size, itemid);
00824                }
00825 #ifndef WIN32 // 32 bit Windows will barf at this :(
00826                else
00827                if(page_size_ <= std::numeric_limits<boost::uint64_t>::max())
00828                {
00829                   ok = readT<boost::uint64_t>(data, size, itemid);
00830                }
00831 #endif
00832                else
00833                {
00834                   throw std::runtime_error("read failed, size is unsupported");
00835                }
00836 
00837                return ok;
00838             }
00839 
00840             void write(void const * data, page_size_t size, itemid_t itemid)
00841             {
00842                if(page_size_ <= std::numeric_limits<boost::uint8_t>::max())
00843                {
00844                   writeT<boost::uint8_t>(data, size, itemid);
00845                }
00846                else
00847                if(page_size_ <= std::numeric_limits<boost::uint16_t>::max())
00848                {
00849                   writeT<boost::uint16_t>(data, size, itemid);
00850                }
00851                else
00852                if(page_size_ <= std::numeric_limits<boost::uint32_t>::max())
00853                {
00854                   writeT<boost::uint32_t>(data, size, itemid);
00855                }
00856 #ifndef WIN32 // 32 bit Windows will barf at this :(
00857                else
00858                if(page_size_ <= std::numeric_limits<boost::uint64_t>::max())
00859                {
00860                   writeT<boost::uint64_t>(data, size, itemid);
00861                }
00862 #endif
00863                else
00864                {
00865                   throw std::runtime_error("write failed, value size is unsupported");
00866                }
00867             }
00868 
00869             void append(void const * data, page_size_t size, itemid_t itemid)
00870             {
00871                if(page_size_ <= std::numeric_limits<boost::uint8_t>::max())
00872                {
00873                   appendT<boost::uint8_t>(data, size, itemid);
00874                }
00875                else
00876                if(page_size_ <= std::numeric_limits<boost::uint16_t>::max())
00877                {
00878                   appendT<boost::uint16_t>(data, size, itemid);
00879                }
00880                else
00881                if(page_size_ <= std::numeric_limits<boost::uint32_t>::max())
00882                {
00883                   appendT<boost::uint32_t>(data, size, itemid);
00884                }
00885 #ifndef WIN32 // 32 bit Windows will barf at this :(
00886                else
00887                if(page_size_ <= std::numeric_limits<boost::uint64_t>::max())
00888                {
00889                   appendT<boost::uint64_t>(data, size, itemid);
00890                }
00891 #endif
00892                else
00893                {
00894                   throw std::runtime_error("append failed, size is unsupported");
00895                }
00896             }
00897 
00904             itemid_t get_next_free_id()
00905             {
00906                itemid_t itemid = boost::numeric_cast<itemid_t>(size());
00907 
00908                if(!free_list_.empty()) 
00909                {
00910                   itemid = *free_list_.begin();
00911                }
00912 
00913                if(itemid >= std::numeric_limits<itemid_t>::max())
00914                {
00916                   std::stringstream ss;
00917                   ss << store_fname_ << " store is full";
00918                   throw std::runtime_error(ss.str());
00919                }
00920 
00921                return itemid;
00922             }
00923 
00924             std::streampos get_item_pos(itemid_t itemid) const
00925             {
00926                std::streampos pos;
00927 
00928                if(page_size_ <= std::numeric_limits<boost::uint8_t>::max())
00929                {
00930                   pos = get_item_posT<boost::uint8_t>(itemid);
00931                }
00932                else
00933                if(page_size_ <= std::numeric_limits<boost::uint16_t>::max())
00934                {
00935                   pos = get_item_posT<boost::uint16_t>(itemid);
00936                }
00937                else
00938                if(page_size_ <= std::numeric_limits<boost::uint32_t>::max())
00939                {
00940                   pos = get_item_posT<boost::uint32_t>(itemid);
00941                }
00942 #ifndef WIN32 // 32 bit Windows will barf at this :(
00943                else
00944                if(page_size_ <= std::numeric_limits<boost::uint64_t>::max())
00945                {
00946                   pos = get_item_posT<boost::uint64_t>(itemid);
00947                }
00948 #endif
00949                else
00950                {
00951                   throw std::runtime_error("get_item_pos failed, value size is unsupported");
00952                }
00953 
00954                return pos;
00955             }
00956 
00957       private:
00958 
00959          template <typename valsizeT>
00960          void openT(char const dsname [], bool newdb)
00961          {
00962             if(store_.is_open()) { throw std::runtime_error("The store is already open"); }
00963 
00964             dsname_ = dsname;
00965 
00966             std::stringstream ssStore;
00967             ssStore << dsname_ << "_data." << page_size_;
00968             store_fname_ = ssStore.str();
00969 
00970             std::stringstream ssFreelist;
00971             ssFreelist << dsname_ << "_idx." << page_size_;
00972             freelist_fname_ = ssFreelist.str();
00973 
00974             if(!newdb && boost::filesystem::exists(store_fname_))
00975             {
00976                // Use existing store, sanity check size
00977                boost::uintmax_t const storefilesize = boost::filesystem::file_size(store_fname_);
00978                if(storefilesize % (sizeof(valsizeT) + page_size_)) {
00979                   throw std::runtime_error(std::string("Page size for store is invalid: ") + store_fname_);
00980                }
00981                item_cnt_ = boost::numeric_cast<size_t>((storefilesize / (sizeof(valsizeT) + page_size_)));
00982             }
00983             else
00984             {
00986                { std::ofstream(store_fname_.c_str(), std::ios::binary | std::ios::trunc); }
00987                { std::ofstream(freelist_fname_.c_str(), std::ios::binary | std::ios::trunc); }
00988             }
00989 
00990             // Open store
00991             store_.open(store_fname_.c_str(), std::ios::binary | std::ios::out | std::ios::in);
00992             if(!store_) { throw std::runtime_error(std::string("Unable to open store: ") + store_fname_); }
00993             store_.exceptions(std::ios::badbit | std::ios::failbit); 
00994 
00995             if(boost::filesystem::exists(freelist_fname_))
00996             {
00997                std::streamsize const freelistsize = boost::numeric_cast<std::streamsize>(boost::filesystem::file_size(freelist_fname_));
00998 
00999                if(freelistsize < 0) { throw std::runtime_error(std::string("Invalid free list size: ") + freelist_fname_); }
01000                else
01001                if(freelistsize > 0) // if the file has 0 bytes we can assume there was nothing in the free list
01002                {
01003                   // archive takes care of any io problems for us
01004                   std::ifstream in(freelist_fname_.c_str(), std::ios::binary);
01005                   boost::archive::binary_iarchive ar(in);
01006                   ar >> free_list_;
01007                   item_cnt_ -= free_list_.size(); // item count is less free list size.
01008                }
01009             }
01010          }
01011 
01012          template <typename valsizeT>
01013          bool sizeT(itemid_t itemid, size_t & size)
01014          {
01015             size = 0;
01016 
01017             bool found = exists(itemid);
01018 
01019             if(found)
01020             {
01021                std::streampos pos = get_item_pos(itemid);
01022 
01023                store_.clear();
01024                store_.seekg(pos);
01025 
01026                valsizeT esize = 0;
01027                store_.read(reinterpret_cast<char *>(&esize), sizeof(esize));
01028                size = esize;
01029             }
01030 
01031             return found;
01032          }
01033 
01034          template <typename valsizeT>
01035          bool readT(void * data, page_size_t size, itemid_t itemid)
01036          {
01037             bool found = exists(itemid);
01038 
01039             if(found)
01040             {
01041                if(size > page_size_) {
01042                   throw std::invalid_argument("Read size cannot be greater than page size");
01043                }
01044 
01045                std::streampos pos = get_item_pos(itemid);
01046 
01047                store_.clear();
01048                store_.seekg(pos);
01049 
01050                valsizeT esize = 0;
01051                store_.read(reinterpret_cast<char *>(&esize), sizeof(esize));
01052                if(size > esize) { throw std::invalid_argument("Read size cannot be greater than value size"); }
01053 
01054                store_.read(reinterpret_cast<char *>(data), size);
01055             }
01056 
01057             return found;
01058          }
01059 
01060          template <typename valsizeT>
01061          void writeT(void const * data, size_t size, itemid_t itemid)
01062          {
01063             if(size > page_size_) { throw std::invalid_argument("Write size cannot be greater than page size"); }
01064 
01065             valsizeT const tsize = boost::numeric_cast<valsizeT>(size);
01066 
01067             store_.clear();
01068             store_.seekp(get_item_pos(itemid));
01069 
01070             store_.write(reinterpret_cast<char const *>(&tsize), sizeof(tsize));
01071             KVDS_PAGESTORE_FLUSH_STREAM__(store_);
01072 
01073             store_.write(reinterpret_cast<char const *>(data), size);
01074             KVDS_PAGESTORE_FLUSH_STREAM__(store_);
01075 
01076             // If we're appending a new item we need to add padding
01077             if(itemid == (free_list_.size() + item_cnt_))
01078             {
01079                pad(size);
01080             }
01081 
01082             typename free_list_t::iterator itr = free_list_.find(itemid);
01083 
01084             if((itr != free_list_.end()) || (itemid == (free_list_.size() + item_cnt_)))
01085             {
01086                ++item_cnt_;
01087             }
01088 
01089             if(itr != free_list_.end())
01090             {
01091                free_list_.erase(itr);
01092             }
01093          }
01094 
01095          template <typename valsizeT>
01096          void appendT(void const * data, size_t size, itemid_t itemid)
01097          {
01098             size_t esize = 0;
01099             this->size(itemid, esize);
01100             valsizeT nsize = boost::numeric_cast<valsizeT>(esize + size);
01101 
01102             if(nsize > page_size_) { throw std::invalid_argument("Append size cannot be greater than page size"); }
01103 
01104             store_.clear();
01105             store_.seekp(get_item_pos(itemid));
01106 
01107             store_.write(reinterpret_cast<char const *>(&nsize), sizeof(nsize));
01108             KVDS_PAGESTORE_FLUSH_STREAM__(store_);
01109 
01110             store_.seekp(boost::numeric_cast<std::streamoff>(esize), std::ios::cur);
01111 
01112             store_.write(reinterpret_cast<char const *>(data), size);
01113             KVDS_PAGESTORE_FLUSH_STREAM__(store_);
01114 
01115             // If we're appending a new item we need to add padding
01116             if(itemid == (free_list_.size() + item_cnt_))
01117             {
01118                pad(nsize);
01119             }
01120 
01121             typename free_list_t::iterator itr = free_list_.find(itemid);
01122 
01123             if((itr != free_list_.end()) || (itemid == (free_list_.size() + item_cnt_)))
01124             {
01125                ++item_cnt_;
01126             }
01127 
01128             if(itr != free_list_.end())
01129             {
01130                free_list_.erase(itr);
01131             }
01132          }
01133 
01134          template <typename valsizeT>
01135          std::streampos get_item_posT(itemid_t itemid) const
01136          {
01137             return boost::numeric_cast<std::streampos>(itemid * (sizeof(valsizeT) + page_size_));
01138          }
01139 
01140       private:
01141          enum { PADSIZE = 0xFF };
01142          static char const * GetPadding()
01143          {
01144             static char const PADBUF [PADSIZE] = {0};
01145             return PADBUF;
01146          }
01147 
01148          void pad(page_size_t size) // Write padding in 0xFF byte blocks.
01149          {
01150             page_size_t pad_size = (page_size_ - size);
01151 
01152             if(pad_size)
01153             {
01154                static char const * const padbuf = GetPadding();
01155                page_size_t topad = 0;
01156 
01157                while(pad_size)
01158                {
01159                   topad = std::min(pad_size, (size_t)PADSIZE);
01160                   if(!store_.write(padbuf, topad)) { throw std::runtime_error("Unable to write padding"); }
01161                   KVDS_PAGESTORE_FLUSH_STREAM__(store_);
01162                   pad_size -= topad;
01163                }
01164             }
01165          }
01166 
01167       private:
01168          page_size_t const page_size_;
01169          size_t item_cnt_;
01170          typedef moost::container::sparse_hash_set<itemid_t> free_list_t;
01171          free_list_t free_list_;
01172          std::fstream store_;
01173          std::string store_fname_;
01174          std::string dsname_;
01175          std::string freelist_fname_;
01176       };
01177 
01178       typedef boost::shared_ptr<Store> store_t;
01179       typedef moost::container::sparse_hash_map<storeid_t, store_t> store_index_t;
01180       typedef std::bitset<sizeof(page_size_t) * 8> store_inventory_t;
01181 
01182    public:
01183       typedef pagemap_t store_type;
01184 
01185       KvdsPageStore() : iterating_(false) { }
01186 
01187       ~KvdsPageStore()
01188       {
01189          close();
01190       }
01191 
01192       // Allow the client to apply any specific settings to the underlying pagemap.
01193       pagemap_t & get_pagemap()
01194       {
01195          return pagemap_;
01196       }
01197 
01198       void open(
01199          char const dsname [],
01200          bool newdb = false
01201          )
01202       {
01203          if(!dsname_.empty()) { throw std::runtime_error("The store is already open"); }
01204 
01205          dsname_ = dsname;
01206 
01207          std::stringstream ssStoreInv;
01208          ssStoreInv << dsname_ << "_inv";
01209          storeinv_fname_ = ssStoreInv.str();
01210 
01211          if(newdb)
01212          {
01213             boost::filesystem::remove(pagemap_fname_);
01214          }
01215          else
01216          {
01217             std::ifstream in(storeinv_fname_.c_str());
01218             if(in.is_open())
01219             {
01220                in.exceptions(std::ios::badbit);
01221 
01222                // The inventory bitmap is a text file of 1's and 0's (eg. "0000010011010001")
01223                std::string s;
01224                in >> s;
01225 
01226                // create the store inventory bitset
01227                store_inventory_ = store_inventory_t(trim_inventory_string(s));
01228 
01229                // The number of 1's in s should correspond to the number of bits set in the store inventory bitset
01230                size_t expectedBitCount = std::count(s.begin(), s.end(), '1');
01231                if(expectedBitCount != store_inventory_.count())
01232                {
01233                   throw std::runtime_error("the format of the inventory file is incorrect");
01234                }
01235 
01236                // we seem to have a valid inventory so use it to create the stores
01237                for(page_size_t page_size = 1 ; page_size ; page_size <<= 1)
01238                {
01239                   storeid_t const storeid = moost::utils::msb_set(page_size);
01240 
01241                   if(store_inventory_[storeid])
01242                   {
01243                      store_t store(new Store(page_size));
01244                      store->open(dsname, newdb);
01245                      store_index_[storeid] = store;
01246                   }
01247                }
01248             }
01249          }
01250 
01251          // now the stores are open all that remains is to open the store index
01252          std::stringstream ssKeyValIndex;
01253          ssKeyValIndex << dsname_ << "_idx";
01254          pagemap_fname_ = ssKeyValIndex.str();
01255          pagemap_.load(pagemap_fname_, newdb);
01256       }
01257 
01258       void save()
01259       {
01260          save_impl();
01261       }
01262 
01263       void close()
01264       {
01265          if(!dsname_.empty())
01266          {
01267             save_impl(true);
01268             dsname_.clear();
01269          }
01270       }
01271 
01272    private:
01273 
01274       std::string trim_inventory_string(std::string s) const
01275       {
01276          std::string::size_type pos = s.find('1');
01277          if(pos != std::string::npos)
01278          {
01279             s = s.substr(pos);
01280          }
01281 
01282          return s;
01283       }
01284 
01285       void save_impl(bool bClose = false)
01286       {
01287          for(typename store_index_t::iterator itr = store_index_.begin() ; itr != store_index_.end() ; ++itr)
01288          {
01289             itr->second->save();
01290             if(bClose) { itr->second->close(); }
01291          }
01292 
01293          pagemap_.save(pagemap_fname_);
01294       }
01295 
01296       storeid_t const * get_storeid(page_size_t page_size)
01297       {
01298          page_size = moost::utils::next_power_of_two(page_size);
01299          storeid_t const storeid = moost::utils::msb_set(page_size);
01300          typename store_index_t::const_iterator itr = store_index_.find(storeid);
01301 
01302          if(itr == store_index_.end())
01303          {
01304             store_t store(new Store(page_size));
01305             store->open(dsname_.c_str(), true);
01306             itr = store_index_.insert(std::make_pair(storeid, store)).first;
01307 
01308             store_inventory_.set(storeid);
01309             std::ofstream out(storeinv_fname_.c_str());
01310             out << trim_inventory_string(store_inventory_.to_string());
01311          }
01312 
01313          return (itr == store_index_.end()) ? 0 : &itr->first;
01314       }
01315 
01316       store_t get_store(storeid_t const id) const
01317       {
01318          typename store_index_t::const_iterator itr = store_index_.find(id);
01319 
01320          assert(itr != store_index_.end() && itr->second);
01321 
01322          if(itr == store_index_.end() || !itr->second)
01323          {
01324             throw std::runtime_error("error retrieving store");
01325          }
01326 
01327          return itr->second;
01328       }
01329 
01330    public:
01333       store_type & get_store() { return pagemap_; }
01334 
01335    public: // IKvds interface implementation
01336 
01338       bool put(
01339          void const * pkey, size_t const ksize,
01340          void const * pval, size_t const vsize
01341          )
01342       {
01343          bool ok = false;
01344 
01345          storeid_t const * pstoreid = get_storeid(vsize);
01346 
01347          if(pstoreid)
01348          {
01349             typename pagemap_t::iterator itr = pagemap_.find(pkey, ksize);
01350 
01351             if(itr != pagemap_.end() && (itr->second.first != *pstoreid))
01352             {
01353                get_store(itr->second.first)->erase(itr->second.second);
01354                pagemap_.erase(itr);
01355                itr = pagemap_.end();
01356             }
01357 
01358             store_t store = get_store(*pstoreid);
01359             itemid_t itemid = (itr == pagemap_.end()) ? store->get_next_free_id() : itr->second.second;
01360             store->write(pval, vsize, itemid);
01361 
01362             if(itr == pagemap_.end())
01363             {
01364                pagemap_.insert(pkey, ksize, std::make_pair(*pstoreid, itemid));
01365             }
01366 
01367             ok = true;
01368          }
01369 
01370          return ok;
01371       }
01372 
01373       bool get(
01374          void const * pkey, size_t const ksize,
01375          void * pval, size_t & vsize
01376          )
01377       {
01378          bool found = false;
01379 
01380          typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize);
01381 
01382          page_size_t esize = 0;
01383          if(itr != pagemap_.end())
01384          {
01385             store_t store = get_store(itr->second.first);
01386 
01387             if(store->size(itr->second.second, esize))
01388             {
01389                vsize = std::min(vsize, esize);
01390                found = store->read(pval, vsize, itr->second.second);
01391             }
01392          }
01393 
01394          if(!found) { vsize = 0; }
01395 
01396          return found;
01397       }
01398 
01400       bool add(
01401          void const * pkey, size_t const ksize,
01402          void const * pval, size_t const vsize
01403          )
01404       {
01405          bool ok = false;
01406 
01407          page_size_t esize = 0;
01408          page_size_t nsize = vsize;
01409 
01410          store_t store;
01411 
01412          typename pagemap_t::iterator itr = pagemap_.find(pkey, ksize);
01413          if(itr != pagemap_.end())
01414          {
01415             store = get_store(itr->second.first);
01416             store->size(itr->second.second, esize);
01417             nsize += esize;
01418          }
01419 
01420          storeid_t const * pstoreid = get_storeid(nsize);
01421 
01422          if(pstoreid)
01423          {
01424             store_t promoted_store = get_store(*pstoreid);
01425 
01426             if(itr != pagemap_.end() && (itr->second.first != *pstoreid))
01427             {
01428                itemid_t itemid = promoted_store->get_next_free_id();
01429 
01430                byte_array_t buf(esize);
01431                if(store->read(&buf[0], esize, itr->second.second))
01432                {
01433                   promoted_store->write(&buf[0], esize, itemid);
01434                }
01435 
01436                store->erase(itr->second.second);
01437                itr->second.first = (*pstoreid);
01438                itr->second.second = itemid;
01439 
01440                store = get_store(itr->second.first);
01441             }
01442 
01443             if(itr == pagemap_.end())
01444             {
01445                itr = pagemap_.insert(pkey, ksize, std::make_pair(*pstoreid, promoted_store->get_next_free_id()));
01446 
01447                store = get_store(itr->second.first);
01448             }
01449 
01450             store->append(pval, vsize, itr->second.second);
01451 
01452             ok = true;
01453          }
01454 
01455          return ok;
01456       }
01457 
01458       bool all(
01459          void const * pkey, size_t const ksize,
01460          void * pval, size_t & vsize
01461          )
01462       {
01463          bool found = false;
01464 
01465          typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize);
01466 
01467          page_size_t esize = 0;
01468          if((itr != pagemap_.end()) && (get_store(itr->second.first)->size(itr->second.second, esize)))
01469          {
01470             if(esize <= vsize)
01471             {
01472                found = get(
01473                   pkey, ksize,
01474                   pval, esize
01475                   );
01476             }
01477 
01478             vsize = esize;
01479          }
01480          else { vsize = 0; }
01481 
01482          return found;
01483       }
01484 
01485       bool xst(
01486          void const * pkey, size_t const ksize
01487          )
01488       {
01489          bool found = false;
01490 
01491          typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize);
01492          if(itr != pagemap_.end())
01493          {
01494             found = get_store(itr->second.first)->exists(itr->second.second);
01495          }
01496 
01497          return found;
01498       }
01499 
01500       bool del(
01501          void const * pkey, size_t const ksize
01502          )
01503       {
01504          bool found = false;
01505 
01506          typename pagemap_t::iterator itr = pagemap_.find(pkey, ksize);
01507          if(itr != pagemap_.end())
01508          {
01509             get_store(itr->second.first)->erase(itr->second.second);
01510             pagemap_.erase(itr);
01511             found = true;
01512          }
01513 
01514          return found;
01515       }
01516 
01517       bool clr()
01518       {
01519          // Truncates all filestores and indexes (this CANNOT be undone!)
01520 
01521          for(typename store_index_t::const_iterator itr = store_index_.begin() ; itr != store_index_.end() ; ++itr)
01522          {
01523             itr->second->clear();
01524          }
01525 
01526          pagemap_.clear();
01527 
01528          { std::ofstream(pagemap_fname_.c_str(), std::ios::binary | std::ios::trunc); }
01529 
01530          return true;
01531       }
01532 
01533       bool beg()
01534       {
01535          itr_ = pagemap_.begin();
01536          return iterating_ = true;
01537       }
01538 
01539       bool nxt(
01540          void * pkey, size_t & ksize
01541          )
01542       {
01543          bool found = false;
01544 
01545          if(iterating_ && (itr_ != pagemap_.end()))
01546          {
01547             found = pagemap_.itr2key(itr_, pkey, ksize);
01548             if(found) { ++itr_; }
01549          }
01550          else
01551          {
01552             ksize = 0;
01553             iterating_ = false;
01554          }
01555 
01556          return found;
01557       }
01558 
01559       bool end()
01560       {
01561          return iterating_ ? itr_ == pagemap_.end() : true;
01562       }
01563 
01564       bool siz(
01565          void const * pkey, size_t const ksize,
01566          size_t & vsize
01567          )
01568 
01569       {
01570          bool found = false;
01571 
01572          typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize);
01573 
01574          if(itr != pagemap_.end())
01575          {
01576             found = get_store(itr->second.first)->size(itr->second.second, vsize);
01577          }
01578 
01579          return found;
01580       }
01581 
01582       bool cnt(boost::uint64_t & cnt)
01583       {
01584          cnt = pagemap_.size();
01585          return true;
01586       }
01587 
01588       bool nil(bool & isnil)
01589       {
01590          isnil =  pagemap_.empty();
01591          return true;
01592       }
01593 
01594    private:
01595       std::string dsname_;
01596       std::string pagemap_fname_;
01597       std::string storeinv_fname_;
01598       pagemap_t pagemap_;
01599       typename pagemap_t::const_iterator itr_;
01600       store_index_t store_index_;
01601       store_inventory_t store_inventory_;
01602       bool iterating_;
01603    };
01604 
01605 }}
01606 
01607 #endif // MOOST_KVDS_KVDS_PAGESTORE_HPP__