libmoost
|
00001 /* vim:set ts=3 sw=3 sts=3 et: */ 00028 #ifndef MOOST_KVDS_KVDS_PAGESTORE_HPP__ 00029 #define MOOST_KVDS_KVDS_PAGESTORE_HPP__ 00030 00035 00036 #include <vector> 00037 #include <sstream> 00038 #include <fstream> 00039 #include <cassert> 00040 #include <limits> 00041 #include <bitset> 00042 #include <algorithm> 00043 #include <utility> 00044 #include <csignal> 00045 00046 #include <boost/cstdint.hpp> 00047 #include <boost/filesystem.hpp> 00048 #include <boost/shared_ptr.hpp> 00049 #include <boost/cast.hpp> 00050 00051 #include <boost/archive/binary_oarchive.hpp> 00052 #include <boost/archive/binary_iarchive.hpp> 00053 #include <boost/serialization/utility.hpp> 00054 00055 #include "../utils/bits.hpp" 00056 #include "../algorithm/fast_hash.hpp" 00057 #include "../container/sparse_hash_map.hpp" 00058 #include "../container/sparse_hash_set.hpp" 00059 00060 #include "../serialization/hashmap_serializer.hpp" 00061 #include "../serialization/hashset_serializer.hpp" 00062 00063 #include "ikvds.hpp" 00064 00066 #ifndef NDEBUG 00067 #define KVDS_PAGESTORE_FLUSH_STREAM__(S) S.flush() 00068 #else 00069 #define KVDS_PAGESTORE_FLUSH_STREAM__(S) 00070 #endif 00071 00075 00076 namespace moost { namespace kvds { 00077 00078 typedef std::vector<char> byte_array_t; 00079 00084 00087 00088 template <typename pagemapT> 00089 class KvdsPageMapShared 00090 { 00091 public: 00092 typedef pagemapT pagemap_t; 00093 typedef typename pagemap_t::size_type size_type; 00094 typedef typename pagemap_t::iterator iterator; 00095 typedef typename pagemap_t::const_iterator const_iterator; 00096 typedef typename pagemap_t::pageinfo_t pageinfo_t; 00097 typedef typename pagemap_t::itemid_t itemid_t; 00098 typedef typename pagemap_t::storage_t storage_t; 00099 typedef KvdsPageMapShared<pagemapT> this_type; 00100 00101 private: 00102 00108 struct metadata 00109 { 00110 typedef KvdsPageMapShared<pagemapT> shared_pagemap_t; 00111 typedef typename shared_pagemap_t::pagemap_t pagemap_t; 00112 typedef std::vector<shared_pagemap_t *> shared_pagemaps_t; 00113 00114 metadata(shared_pagemap_t * pshared) : 00115 isdirty(0), ppagemap(new pagemap_t), pshared_pagemaps(new shared_pagemaps_t) 00116 { 00117 typename shared_pagemaps_t::iterator itr = std::find(pshared_pagemaps->begin(), pshared_pagemaps->end(), pshared); 00118 00123 00124 assert(itr == pshared_pagemaps->end()); 00125 00129 if(itr == pshared_pagemaps->end()) 00130 { 00131 pshared_pagemaps->push_back(pshared); 00132 } 00133 } 00134 00135 bool is_owner(shared_pagemap_t * pshared) 00136 { 00137 // the first item in the list is always considered the owner. If the owner 00138 // is removed (when it is destroyed) the next item will become the owner. 00139 return !pshared_pagemaps->empty() && *(pshared_pagemaps->begin()) == pshared; 00140 } 00141 00142 00143 00145 void remove(shared_pagemap_t * pshared) 00146 { 00147 std::remove(pshared_pagemaps->begin(), pshared_pagemaps->end(), pshared); 00148 } 00149 00151 volatile std::sig_atomic_t isdirty; 00152 00155 boost::shared_ptr<pagemap_t> ppagemap; 00156 00159 boost::shared_ptr<shared_pagemaps_t> pshared_pagemaps; 00160 }; 00161 00162 boost::shared_ptr<metadata> pmetadata_; 00163 00164 public: 00165 00166 ~KvdsPageMapShared() 00167 { 00168 if(pmetadata_) 00169 { 00170 pmetadata_->remove(this); 00171 } 00172 } 00173 00175 void get_metadata_from(this_type const & rhs) 00176 { 00177 // make sure there is something to share 00178 rhs.validate_metadata_exists(); 00179 00180 // it's good to share 00181 pmetadata_ = rhs.pmetadata_; 00182 } 00183 00185 void give_metadata_to(this_type const & rhs) 00186 { 00187 // make sure there is something to share 00188 validate_metadata_exists(); 00189 00190 // it's good to share 00191 rhs.pmetadata_ = pmetadata_; 00192 } 00193 00194 storage_t & get_storage() 00195 { 00196 // use responsibly, remember using this function is NOT thread safe! 00197 create_metadata_ondemand(); 00198 return pmetadata_->ppagemap->get_storage(); 00199 } 00200 00204 00205 iterator begin() 00206 { 00207 create_metadata_ondemand(); 00208 00209 // since the iterator is non-const we have to assume the store is now dirty 00210 pmetadata_->isdirty = 1; 00211 return pmetadata_->ppagemap->begin(); 00212 } 00213 00214 iterator end() 00215 { 00216 create_metadata_ondemand(); 00217 return pmetadata_->ppagemap->end(); 00218 } 00219 00220 size_type size() const 00221 { 00222 return pmetadata_ ? pmetadata_->ppagemap->size() : 0; 00223 } 00224 00225 void resize(size_type const size) 00226 { 00227 create_metadata_ondemand(); 00228 pmetadata_->ppagemap->resize(size); 00229 } 00230 00231 bool empty() const 00232 { 00233 return pmetadata_ ? pmetadata_->ppagemap->empty() : true; 00234 } 00235 00236 void clear() 00237 { 00238 if(pmetadata_) 00239 { 00240 pmetadata_->isdirty = 1; 00241 pmetadata_->ppagemap->clear(); 00242 } 00243 } 00244 00245 iterator find ( 00246 void const * pkey, size_t const ksize 00247 ) 00248 { 00249 create_metadata_ondemand(); 00250 iterator itr = pmetadata_->ppagemap->find(pkey, ksize); 00251 00252 if(itr != end()) 00253 { 00254 // since the iterator is non-const we have to assume the store is now dirty 00255 pmetadata_->isdirty = 1; 00256 } 00257 00258 return itr; 00259 } 00260 00261 void erase(iterator itr) 00262 { 00263 if(pmetadata_) 00264 { 00265 pmetadata_->isdirty = 1; 00266 pmetadata_->ppagemap->erase(itr); 00267 } 00268 } 00269 00270 iterator insert( 00271 void const * pkey, size_t const ksize, 00272 pageinfo_t const & page_info 00273 ) 00274 { 00275 create_metadata_ondemand(); 00276 pmetadata_->isdirty = 1; 00277 return pmetadata_->ppagemap->insert(pkey, ksize, page_info); 00278 } 00279 00280 bool itr2key( 00281 const_iterator itr, 00282 void * pkey, size_t & ksize 00283 ) const 00284 { 00285 return pmetadata_ ? pmetadata_->ppagemap->itr2key(itr, pkey, ksize) : false; 00286 } 00287 00288 void load(std::string const & fname, bool newdb) 00289 { 00290 // if I'm the owner I'll load the data 00291 if(create_metadata_ondemand() && pmetadata_->is_owner(this)) 00292 { 00293 pmetadata_->isdirty = !boost::filesystem::exists(fname) || newdb ? 1 : 0; 00294 pmetadata_->ppagemap->load(fname, newdb); 00295 } 00296 } 00297 00298 void save(std::string const & fname) 00299 { 00300 // if there is metadata and we're the store ownder and the store is dirty then save. 00301 if(pmetadata_ && pmetadata_->is_owner(this) && pmetadata_->isdirty) 00302 { 00303 pmetadata_->ppagemap->save(fname); 00304 } 00305 } 00306 00307 private: 00308 void validate_metadata_exists() const 00309 { 00310 if(!pmetadata_) { throw std::runtime_error("shared pagemap metadata not found"); } 00311 } 00312 00317 bool create_metadata_ondemand() 00318 { 00319 if(!pmetadata_) 00320 { 00321 pmetadata_.reset(new metadata(this)); 00322 return true; 00323 } 00324 00325 return false; 00326 } 00327 }; 00328 00330 00332 00333 template <typename keyT> 00334 class KvdsPageMapIntrinsicKey 00335 { 00336 public: 00337 typedef boost::uint8_t storeid_t; 00338 typedef boost::uint32_t itemid_t; // 4 billion items per store should be enough for anyone. 00339 typedef std::pair<storeid_t, itemid_t> pageinfo_t; 00340 typedef keyT key_type; 00341 typedef moost::container::sparse_hash_map<key_type, pageinfo_t> storage_t; 00342 typedef typename storage_t::size_type size_type; 00343 typedef typename storage_t::iterator iterator; 00344 typedef typename storage_t::const_iterator const_iterator; 00345 00346 public: 00347 KvdsPageMapIntrinsicKey() 00348 { 00349 // Since we support deletions there must always be a deleted key. 00350 // By default it'll be the max value that can be represented by 00351 // the key type. This can be changed by making a call to "get_storage" 00352 // to get a handle to the underlying storage and then setting a new value. 00353 storage_.set_deleted_key(std::numeric_limits<key_type>::max()); 00354 00355 // set an arbitrary starting size for the sparse hashmap. Failure to set a 00356 // reasonable size won't cause a fail but it will cause a performance hit. 00357 // Since sparse hashmap is very good in terms of memory usage we can affort 00358 // to push the boat out a little and set a reasonable default starting size. 00359 storage_.resize(0xFFFFFF); 00360 } 00361 00364 // and yet different stores may have different requirements. For example the 00366 storage_t & get_storage() 00367 { 00368 return storage_; 00369 } 00370 00374 00375 iterator begin() { return storage_.begin(); } 00376 00377 iterator end() { return storage_.end(); } 00378 00379 size_type size() const { return storage_.size(); } 00380 00381 void resize(size_type const size) { storage_.resize(size); } 00382 00383 bool empty() const { return storage_.empty(); } 00384 00385 void clear() { storage_.clear(); } 00386 00387 iterator find ( 00388 void const * pkey, size_t const /*ksize*/ 00389 ) 00390 { 00391 key_type key = *((key_type*)pkey); 00392 return storage_.find(key); 00393 } 00394 00395 void erase(iterator itr) 00396 { 00397 assert(itr != storage_.end()); 00398 storage_.erase(itr); 00399 } 00400 00401 iterator insert( 00402 void const * pkey, size_t const /*ksize*/, 00403 pageinfo_t const & page_info 00404 ) 00405 { 00406 key_type key = *((key_type*)pkey); 00407 return storage_.insert(std::make_pair(key, page_info)).first; 00408 } 00409 00411 bool itr2key( 00412 const_iterator itr, 00413 void * pkey, size_t & ksize 00414 ) const 00415 { 00416 bool ok = false; 00417 size_t const size = sizeof(itr->first); 00418 00419 if(size <= ksize) 00420 { 00421 memcpy(pkey, &itr->first, size); 00422 ok = true; 00423 } 00424 00425 ksize = size; 00426 00427 return ok; 00428 } 00429 00430 void load(std::string const & fname, bool newdb) 00431 { 00432 if(newdb) 00433 { 00434 boost::filesystem::remove(fname); 00435 00436 } 00437 else 00438 // only try to load it if it exists 00439 if(boost::filesystem::exists(fname)) 00440 { 00441 // archive takes care of any io problems for us 00442 std::ifstream in(fname.c_str(), std::ios::binary); 00443 boost::archive::binary_iarchive ar(in); 00444 ar >> storage_; 00445 } 00446 } 00447 00448 void save(std::string const & fname) 00449 { 00450 // archive takes care of any io problems for us 00451 std::ofstream out(fname.c_str(), std::ios::binary); 00452 boost::archive::binary_oarchive ar(out); 00453 ar << storage_; 00454 } 00455 00456 private: 00457 storage_t storage_; 00458 }; 00459 00460 00462 struct KvdsPageMapDefaultKeyHashFunctor : public std::unary_function< byte_array_t, size_t > 00463 { 00464 size_t operator()(const byte_array_t & key) const 00465 { 00466 return moost::algorithm::fast_hash(&key[0], key.size() * sizeof(byte_array_t::value_type)); 00467 } 00468 }; 00469 00475 00477 00478 template <typename KeyHashFunctorT = KvdsPageMapDefaultKeyHashFunctor> 00479 class KvdsPageMapNonIntrinsicKey 00480 { 00481 public: 00482 typedef boost::uint8_t storeid_t; 00483 typedef boost::uint32_t itemid_t; // 4 billion items per store should be enough for anyone. 00484 typedef std::pair<storeid_t, itemid_t> pageinfo_t; 00485 typedef KeyHashFunctorT KeyHashFunctor; 00486 typedef moost::container::sparse_hash_map<byte_array_t, pageinfo_t, KeyHashFunctor> storage_t; 00487 typedef typename storage_t::key_type key_type; 00488 typedef typename storage_t::size_type size_type; 00489 typedef typename storage_t::iterator iterator; 00490 typedef typename storage_t::const_iterator const_iterator; 00491 00492 public: 00493 KvdsPageMapNonIntrinsicKey() 00494 { 00495 // Since we support deletions there must always be a deleted key. 00496 // By default it'll be an empty key (remember a key is just a vector 00497 // of bytes). This can be changed by making a call to "get_storage" 00498 // to get a handle to the underlying storage and then setting a new value. 00499 storage_.set_deleted_key(key_type()); 00500 00501 // set an arbitrary starting size for the sparse hashmap. Failure to set a 00502 // reasonable size won't cause a fail but it will cause a performance hit. 00503 // Since sparse hashmap is very good in terms of memory usage we can affort 00504 // to push the boat out a little and set a reasonable default starting size. 00505 storage_.resize(0xFFFFFF); 00506 } 00507 00512 storage_t & get_storage() 00513 { 00514 return storage_; 00515 } 00516 00520 00521 iterator begin() { return storage_.begin(); } 00522 00523 iterator end() { return storage_.end(); } 00524 00525 size_type size() const { return storage_.size(); } 00526 00527 void resize(size_type const size) { storage_.resize(size); } 00528 00529 bool empty() const { return storage_.empty(); } 00530 00531 void clear() { storage_.clear(); } 00532 00533 iterator find ( 00534 void const * pkey, size_t const ksize 00535 ) 00536 { 00537 key_type key(ksize); 00538 memcpy(&key[0], pkey, ksize); 00539 return storage_.find(key); 00540 } 00541 00542 void erase(iterator itr) 00543 { 00544 storage_.erase(itr); 00545 } 00546 00547 iterator insert( 00548 void const * pkey, size_t const ksize, 00549 pageinfo_t const & page_info 00550 ) 00551 { 00552 key_type key(ksize); 00553 memcpy(&key[0], pkey, ksize); 00554 return storage_.insert(std::make_pair(key, page_info)).first; 00555 } 00556 00558 bool itr2key( 00559 const_iterator itr, 00560 void * pkey, size_t & ksize 00561 ) const 00562 { 00563 bool ok = false; 00564 size_t const size = itr->first.size(); 00565 00566 if(size <= ksize) 00567 { 00568 memcpy(pkey, &itr->first[0], size); 00569 ok = true; 00570 } 00571 00572 ksize = size; 00573 00574 return ok; 00575 } 00576 00577 void load(std::string const & fname, bool newdb) 00578 { 00579 if(newdb) 00580 { 00581 boost::filesystem::remove(fname); 00582 00583 } 00584 else 00585 { 00586 std::ifstream in(fname.c_str(), std::ios::binary); 00587 in.exceptions(std::ios::badbit); 00588 00589 key_type key; 00590 00591 // Assume the stream is all ready for reading and exceptions are on 00592 while(in) 00593 { 00594 size_t size = 0; 00595 in.read(reinterpret_cast<char *>(&size), sizeof(size)); 00596 00597 if(in) 00598 { 00599 key.resize(size); 00600 in.read(&key[0], size * sizeof(typename storage_t::key_type::value_type)); 00601 00602 if(in) 00603 { 00604 typename storage_t::data_type & val = storage_[key]; 00605 in.read(reinterpret_cast<char *>(&val.first), sizeof(val.first)); 00606 in.read(reinterpret_cast<char *>(&val.second), sizeof(val.second)); 00607 } 00608 } 00609 } 00610 } 00611 } 00612 00613 void save(std::string const & fname) const 00614 { 00615 std::ofstream out(fname.c_str(), std::ios::binary | std::ios::trunc); 00616 if(!out) { throw std::runtime_error("Unable to open keyval index for writing"); } 00617 00618 out.exceptions(std::ios::badbit | std::ios::failbit); 00619 00620 // Assume the stream is all ready for writing and exceptions are on 00621 for(typename storage_t::const_iterator itr = storage_.begin() ; itr != storage_.end() ; ++itr) 00622 { 00624 size_t size = itr->first.size(); 00625 out.write(reinterpret_cast<char *>(&size), sizeof(size)); 00626 KVDS_PAGESTORE_FLUSH_STREAM__(out); 00627 out.write(&itr->first[0], size * sizeof(typename storage_t::key_type::value_type)); 00628 KVDS_PAGESTORE_FLUSH_STREAM__(out); 00629 00631 out.write(reinterpret_cast<char const *>(&itr->second.first), sizeof(itr->second.first)); 00632 KVDS_PAGESTORE_FLUSH_STREAM__(out); 00633 out.write(reinterpret_cast<char const *>(&itr->second.second), sizeof(itr->second.second)); 00634 KVDS_PAGESTORE_FLUSH_STREAM__(out); 00635 } 00636 } 00637 00638 private: 00639 storage_t storage_; 00640 }; 00641 00643 00644 template < 00645 typename PageMapT // Either intrinsic (built-in integer types such as int or long) or non-instrinsic key. 00646 > 00647 class KvdsPageStore : public IKvds 00648 { 00649 private: 00650 typedef size_t page_size_t; 00651 typedef PageMapT pagemap_t; 00652 typedef boost::uint8_t storeid_t; 00653 typedef typename PageMapT::itemid_t itemid_t; 00654 typedef typename PageMapT::pageinfo_t pageinfo_t; 00655 00656 class Store 00657 { 00658 public: 00659 Store(page_size_t page_size) : 00660 page_size_(page_size), item_cnt_(0), free_list_(0xFFFF) // freelist size set to arbitrary 64K items (sparse map, low cost) 00661 { 00662 if(0 == page_size) 00663 { 00664 throw std::invalid_argument("Page size must be greater than zero"); 00665 } 00666 00667 if(!moost::utils::is_power_of_two(page_size)) 00668 { 00669 throw std::invalid_argument("Page size must be a power of two"); 00670 } 00671 00672 // If we ever hit this number (it's huge) we probably have bigger fish to fry! 00673 free_list_.set_deleted_key(std::numeric_limits<itemid_t>::max()); 00674 } 00675 00676 ~Store() 00677 { 00678 if(store_.is_open()) 00679 { 00680 close(); 00681 } 00682 } 00683 00684 page_size_t get_page_size() const { return page_size_; } 00685 00686 void open(char const dsname [], bool newdb) 00687 { 00688 if(page_size_ <= std::numeric_limits<boost::uint8_t>::max()) 00689 { 00690 openT<boost::uint8_t>(dsname, newdb); 00691 } 00692 else 00693 if(page_size_ <= std::numeric_limits<boost::uint16_t>::max()) 00694 { 00695 openT<boost::uint16_t>(dsname, newdb); 00696 } 00697 else 00698 if(page_size_ <= std::numeric_limits<boost::uint32_t>::max()) 00699 { 00700 openT<boost::uint32_t>(dsname, newdb); 00701 } 00702 #ifndef WIN32 // 32 bit Windows will barf at this :( 00703 else 00704 if(page_size_ <= std::numeric_limits<boost::uint64_t>::max()) 00705 { 00706 openT<boost::uint64_t>(dsname, newdb); 00707 } 00708 #endif 00709 else 00710 { 00711 throw std::runtime_error("open failed, value size is unsupported"); 00712 } 00713 } 00714 00715 void save() 00716 { 00717 if(store_.is_open()) 00718 { 00719 // archive takes care of any io problems for us 00720 std::ofstream out(freelist_fname_.c_str(), std::ios::binary); 00721 boost::archive::binary_oarchive ar(out); 00722 ar << free_list_; 00723 } 00724 } 00725 00726 void close() 00727 { 00728 if(store_.is_open()) 00729 { 00730 store_.close(); 00731 } 00732 } 00733 00734 bool erase(itemid_t itemid) 00735 { 00736 bool found = false; 00737 00738 if(exists(itemid)) 00739 { 00740 free_list_.insert(itemid); 00741 --item_cnt_; 00742 found = true; 00743 } 00744 00745 return found; 00746 } 00747 00748 bool exists(itemid_t itemid) 00749 { 00750 return ((itemid < (free_list_.size() + item_cnt_)) && free_list_.find(itemid) == free_list_.end()); 00751 } 00752 00753 size_t size() const 00754 { 00755 return item_cnt_; 00756 } 00757 00758 bool empty() 00759 { 00760 return 0 == item_cnt_; 00761 } 00762 00763 void clear() 00764 { 00765 // Clear indexes 00766 item_cnt_ = 0; 00767 free_list_.clear(); 00768 00769 // Close store and re-open as new (thus, truncating the store) 00770 store_.close(); 00771 open(dsname_.c_str(), true); 00772 } 00773 00774 bool size(itemid_t itemid, size_t & size) 00775 { 00776 bool ok = false; 00777 00778 if(page_size_ <= std::numeric_limits<boost::uint8_t>::max()) 00779 { 00780 ok = sizeT<boost::uint8_t>(itemid, size); 00781 } 00782 else 00783 if(page_size_ <= std::numeric_limits<boost::uint16_t>::max()) 00784 { 00785 ok = sizeT<boost::uint16_t>(itemid, size); 00786 } 00787 else 00788 if(page_size_ <= std::numeric_limits<boost::uint32_t>::max()) 00789 { 00790 ok = sizeT<boost::uint32_t>(itemid, size); 00791 } 00792 #ifndef WIN32 // 32 bit Windows will barf at this :( 00793 else 00794 if(page_size_ <= std::numeric_limits<boost::uint64_t>::max()) 00795 { 00796 ok = sizeT<boost::uint64_t>(itemid, size); 00797 } 00798 #endif 00799 else 00800 { 00801 throw std::runtime_error("size failed, value size is unsupported"); 00802 } 00803 00804 return ok; 00805 } 00806 00807 bool read(void * data, page_size_t size, itemid_t itemid) 00808 { 00809 bool ok = false; 00810 00811 if(page_size_ <= std::numeric_limits<boost::uint8_t>::max()) 00812 { 00813 ok = readT<boost::uint8_t>(data, size, itemid); 00814 } 00815 else 00816 if(page_size_ <= std::numeric_limits<boost::uint16_t>::max()) 00817 { 00818 ok = readT<boost::uint16_t>(data, size, itemid); 00819 } 00820 else 00821 if(page_size_ <= std::numeric_limits<boost::uint32_t>::max()) 00822 { 00823 ok = readT<boost::uint32_t>(data, size, itemid); 00824 } 00825 #ifndef WIN32 // 32 bit Windows will barf at this :( 00826 else 00827 if(page_size_ <= std::numeric_limits<boost::uint64_t>::max()) 00828 { 00829 ok = readT<boost::uint64_t>(data, size, itemid); 00830 } 00831 #endif 00832 else 00833 { 00834 throw std::runtime_error("read failed, size is unsupported"); 00835 } 00836 00837 return ok; 00838 } 00839 00840 void write(void const * data, page_size_t size, itemid_t itemid) 00841 { 00842 if(page_size_ <= std::numeric_limits<boost::uint8_t>::max()) 00843 { 00844 writeT<boost::uint8_t>(data, size, itemid); 00845 } 00846 else 00847 if(page_size_ <= std::numeric_limits<boost::uint16_t>::max()) 00848 { 00849 writeT<boost::uint16_t>(data, size, itemid); 00850 } 00851 else 00852 if(page_size_ <= std::numeric_limits<boost::uint32_t>::max()) 00853 { 00854 writeT<boost::uint32_t>(data, size, itemid); 00855 } 00856 #ifndef WIN32 // 32 bit Windows will barf at this :( 00857 else 00858 if(page_size_ <= std::numeric_limits<boost::uint64_t>::max()) 00859 { 00860 writeT<boost::uint64_t>(data, size, itemid); 00861 } 00862 #endif 00863 else 00864 { 00865 throw std::runtime_error("write failed, value size is unsupported"); 00866 } 00867 } 00868 00869 void append(void const * data, page_size_t size, itemid_t itemid) 00870 { 00871 if(page_size_ <= std::numeric_limits<boost::uint8_t>::max()) 00872 { 00873 appendT<boost::uint8_t>(data, size, itemid); 00874 } 00875 else 00876 if(page_size_ <= std::numeric_limits<boost::uint16_t>::max()) 00877 { 00878 appendT<boost::uint16_t>(data, size, itemid); 00879 } 00880 else 00881 if(page_size_ <= std::numeric_limits<boost::uint32_t>::max()) 00882 { 00883 appendT<boost::uint32_t>(data, size, itemid); 00884 } 00885 #ifndef WIN32 // 32 bit Windows will barf at this :( 00886 else 00887 if(page_size_ <= std::numeric_limits<boost::uint64_t>::max()) 00888 { 00889 appendT<boost::uint64_t>(data, size, itemid); 00890 } 00891 #endif 00892 else 00893 { 00894 throw std::runtime_error("append failed, size is unsupported"); 00895 } 00896 } 00897 00904 itemid_t get_next_free_id() 00905 { 00906 itemid_t itemid = boost::numeric_cast<itemid_t>(size()); 00907 00908 if(!free_list_.empty()) 00909 { 00910 itemid = *free_list_.begin(); 00911 } 00912 00913 if(itemid >= std::numeric_limits<itemid_t>::max()) 00914 { 00916 std::stringstream ss; 00917 ss << store_fname_ << " store is full"; 00918 throw std::runtime_error(ss.str()); 00919 } 00920 00921 return itemid; 00922 } 00923 00924 std::streampos get_item_pos(itemid_t itemid) const 00925 { 00926 std::streampos pos; 00927 00928 if(page_size_ <= std::numeric_limits<boost::uint8_t>::max()) 00929 { 00930 pos = get_item_posT<boost::uint8_t>(itemid); 00931 } 00932 else 00933 if(page_size_ <= std::numeric_limits<boost::uint16_t>::max()) 00934 { 00935 pos = get_item_posT<boost::uint16_t>(itemid); 00936 } 00937 else 00938 if(page_size_ <= std::numeric_limits<boost::uint32_t>::max()) 00939 { 00940 pos = get_item_posT<boost::uint32_t>(itemid); 00941 } 00942 #ifndef WIN32 // 32 bit Windows will barf at this :( 00943 else 00944 if(page_size_ <= std::numeric_limits<boost::uint64_t>::max()) 00945 { 00946 pos = get_item_posT<boost::uint64_t>(itemid); 00947 } 00948 #endif 00949 else 00950 { 00951 throw std::runtime_error("get_item_pos failed, value size is unsupported"); 00952 } 00953 00954 return pos; 00955 } 00956 00957 private: 00958 00959 template <typename valsizeT> 00960 void openT(char const dsname [], bool newdb) 00961 { 00962 if(store_.is_open()) { throw std::runtime_error("The store is already open"); } 00963 00964 dsname_ = dsname; 00965 00966 std::stringstream ssStore; 00967 ssStore << dsname_ << "_data." << page_size_; 00968 store_fname_ = ssStore.str(); 00969 00970 std::stringstream ssFreelist; 00971 ssFreelist << dsname_ << "_idx." << page_size_; 00972 freelist_fname_ = ssFreelist.str(); 00973 00974 if(!newdb && boost::filesystem::exists(store_fname_)) 00975 { 00976 // Use existing store, sanity check size 00977 boost::uintmax_t const storefilesize = boost::filesystem::file_size(store_fname_); 00978 if(storefilesize % (sizeof(valsizeT) + page_size_)) { 00979 throw std::runtime_error(std::string("Page size for store is invalid: ") + store_fname_); 00980 } 00981 item_cnt_ = boost::numeric_cast<size_t>((storefilesize / (sizeof(valsizeT) + page_size_))); 00982 } 00983 else 00984 { 00986 { std::ofstream(store_fname_.c_str(), std::ios::binary | std::ios::trunc); } 00987 { std::ofstream(freelist_fname_.c_str(), std::ios::binary | std::ios::trunc); } 00988 } 00989 00990 // Open store 00991 store_.open(store_fname_.c_str(), std::ios::binary | std::ios::out | std::ios::in); 00992 if(!store_) { throw std::runtime_error(std::string("Unable to open store: ") + store_fname_); } 00993 store_.exceptions(std::ios::badbit | std::ios::failbit); 00994 00995 if(boost::filesystem::exists(freelist_fname_)) 00996 { 00997 std::streamsize const freelistsize = boost::numeric_cast<std::streamsize>(boost::filesystem::file_size(freelist_fname_)); 00998 00999 if(freelistsize < 0) { throw std::runtime_error(std::string("Invalid free list size: ") + freelist_fname_); } 01000 else 01001 if(freelistsize > 0) // if the file has 0 bytes we can assume there was nothing in the free list 01002 { 01003 // archive takes care of any io problems for us 01004 std::ifstream in(freelist_fname_.c_str(), std::ios::binary); 01005 boost::archive::binary_iarchive ar(in); 01006 ar >> free_list_; 01007 item_cnt_ -= free_list_.size(); // item count is less free list size. 01008 } 01009 } 01010 } 01011 01012 template <typename valsizeT> 01013 bool sizeT(itemid_t itemid, size_t & size) 01014 { 01015 size = 0; 01016 01017 bool found = exists(itemid); 01018 01019 if(found) 01020 { 01021 std::streampos pos = get_item_pos(itemid); 01022 01023 store_.clear(); 01024 store_.seekg(pos); 01025 01026 valsizeT esize = 0; 01027 store_.read(reinterpret_cast<char *>(&esize), sizeof(esize)); 01028 size = esize; 01029 } 01030 01031 return found; 01032 } 01033 01034 template <typename valsizeT> 01035 bool readT(void * data, page_size_t size, itemid_t itemid) 01036 { 01037 bool found = exists(itemid); 01038 01039 if(found) 01040 { 01041 if(size > page_size_) { 01042 throw std::invalid_argument("Read size cannot be greater than page size"); 01043 } 01044 01045 std::streampos pos = get_item_pos(itemid); 01046 01047 store_.clear(); 01048 store_.seekg(pos); 01049 01050 valsizeT esize = 0; 01051 store_.read(reinterpret_cast<char *>(&esize), sizeof(esize)); 01052 if(size > esize) { throw std::invalid_argument("Read size cannot be greater than value size"); } 01053 01054 store_.read(reinterpret_cast<char *>(data), size); 01055 } 01056 01057 return found; 01058 } 01059 01060 template <typename valsizeT> 01061 void writeT(void const * data, size_t size, itemid_t itemid) 01062 { 01063 if(size > page_size_) { throw std::invalid_argument("Write size cannot be greater than page size"); } 01064 01065 valsizeT const tsize = boost::numeric_cast<valsizeT>(size); 01066 01067 store_.clear(); 01068 store_.seekp(get_item_pos(itemid)); 01069 01070 store_.write(reinterpret_cast<char const *>(&tsize), sizeof(tsize)); 01071 KVDS_PAGESTORE_FLUSH_STREAM__(store_); 01072 01073 store_.write(reinterpret_cast<char const *>(data), size); 01074 KVDS_PAGESTORE_FLUSH_STREAM__(store_); 01075 01076 // If we're appending a new item we need to add padding 01077 if(itemid == (free_list_.size() + item_cnt_)) 01078 { 01079 pad(size); 01080 } 01081 01082 typename free_list_t::iterator itr = free_list_.find(itemid); 01083 01084 if((itr != free_list_.end()) || (itemid == (free_list_.size() + item_cnt_))) 01085 { 01086 ++item_cnt_; 01087 } 01088 01089 if(itr != free_list_.end()) 01090 { 01091 free_list_.erase(itr); 01092 } 01093 } 01094 01095 template <typename valsizeT> 01096 void appendT(void const * data, size_t size, itemid_t itemid) 01097 { 01098 size_t esize = 0; 01099 this->size(itemid, esize); 01100 valsizeT nsize = boost::numeric_cast<valsizeT>(esize + size); 01101 01102 if(nsize > page_size_) { throw std::invalid_argument("Append size cannot be greater than page size"); } 01103 01104 store_.clear(); 01105 store_.seekp(get_item_pos(itemid)); 01106 01107 store_.write(reinterpret_cast<char const *>(&nsize), sizeof(nsize)); 01108 KVDS_PAGESTORE_FLUSH_STREAM__(store_); 01109 01110 store_.seekp(boost::numeric_cast<std::streamoff>(esize), std::ios::cur); 01111 01112 store_.write(reinterpret_cast<char const *>(data), size); 01113 KVDS_PAGESTORE_FLUSH_STREAM__(store_); 01114 01115 // If we're appending a new item we need to add padding 01116 if(itemid == (free_list_.size() + item_cnt_)) 01117 { 01118 pad(nsize); 01119 } 01120 01121 typename free_list_t::iterator itr = free_list_.find(itemid); 01122 01123 if((itr != free_list_.end()) || (itemid == (free_list_.size() + item_cnt_))) 01124 { 01125 ++item_cnt_; 01126 } 01127 01128 if(itr != free_list_.end()) 01129 { 01130 free_list_.erase(itr); 01131 } 01132 } 01133 01134 template <typename valsizeT> 01135 std::streampos get_item_posT(itemid_t itemid) const 01136 { 01137 return boost::numeric_cast<std::streampos>(itemid * (sizeof(valsizeT) + page_size_)); 01138 } 01139 01140 private: 01141 enum { PADSIZE = 0xFF }; 01142 static char const * GetPadding() 01143 { 01144 static char const PADBUF [PADSIZE] = {0}; 01145 return PADBUF; 01146 } 01147 01148 void pad(page_size_t size) // Write padding in 0xFF byte blocks. 01149 { 01150 page_size_t pad_size = (page_size_ - size); 01151 01152 if(pad_size) 01153 { 01154 static char const * const padbuf = GetPadding(); 01155 page_size_t topad = 0; 01156 01157 while(pad_size) 01158 { 01159 topad = std::min(pad_size, (size_t)PADSIZE); 01160 if(!store_.write(padbuf, topad)) { throw std::runtime_error("Unable to write padding"); } 01161 KVDS_PAGESTORE_FLUSH_STREAM__(store_); 01162 pad_size -= topad; 01163 } 01164 } 01165 } 01166 01167 private: 01168 page_size_t const page_size_; 01169 size_t item_cnt_; 01170 typedef moost::container::sparse_hash_set<itemid_t> free_list_t; 01171 free_list_t free_list_; 01172 std::fstream store_; 01173 std::string store_fname_; 01174 std::string dsname_; 01175 std::string freelist_fname_; 01176 }; 01177 01178 typedef boost::shared_ptr<Store> store_t; 01179 typedef moost::container::sparse_hash_map<storeid_t, store_t> store_index_t; 01180 typedef std::bitset<sizeof(page_size_t) * 8> store_inventory_t; 01181 01182 public: 01183 typedef pagemap_t store_type; 01184 01185 KvdsPageStore() : iterating_(false) { } 01186 01187 ~KvdsPageStore() 01188 { 01189 close(); 01190 } 01191 01192 // Allow the client to apply any specific settings to the underlying pagemap. 01193 pagemap_t & get_pagemap() 01194 { 01195 return pagemap_; 01196 } 01197 01198 void open( 01199 char const dsname [], 01200 bool newdb = false 01201 ) 01202 { 01203 if(!dsname_.empty()) { throw std::runtime_error("The store is already open"); } 01204 01205 dsname_ = dsname; 01206 01207 std::stringstream ssStoreInv; 01208 ssStoreInv << dsname_ << "_inv"; 01209 storeinv_fname_ = ssStoreInv.str(); 01210 01211 if(newdb) 01212 { 01213 boost::filesystem::remove(pagemap_fname_); 01214 } 01215 else 01216 { 01217 std::ifstream in(storeinv_fname_.c_str()); 01218 if(in.is_open()) 01219 { 01220 in.exceptions(std::ios::badbit); 01221 01222 // The inventory bitmap is a text file of 1's and 0's (eg. "0000010011010001") 01223 std::string s; 01224 in >> s; 01225 01226 // create the store inventory bitset 01227 store_inventory_ = store_inventory_t(trim_inventory_string(s)); 01228 01229 // The number of 1's in s should correspond to the number of bits set in the store inventory bitset 01230 size_t expectedBitCount = std::count(s.begin(), s.end(), '1'); 01231 if(expectedBitCount != store_inventory_.count()) 01232 { 01233 throw std::runtime_error("the format of the inventory file is incorrect"); 01234 } 01235 01236 // we seem to have a valid inventory so use it to create the stores 01237 for(page_size_t page_size = 1 ; page_size ; page_size <<= 1) 01238 { 01239 storeid_t const storeid = moost::utils::msb_set(page_size); 01240 01241 if(store_inventory_[storeid]) 01242 { 01243 store_t store(new Store(page_size)); 01244 store->open(dsname, newdb); 01245 store_index_[storeid] = store; 01246 } 01247 } 01248 } 01249 } 01250 01251 // now the stores are open all that remains is to open the store index 01252 std::stringstream ssKeyValIndex; 01253 ssKeyValIndex << dsname_ << "_idx"; 01254 pagemap_fname_ = ssKeyValIndex.str(); 01255 pagemap_.load(pagemap_fname_, newdb); 01256 } 01257 01258 void save() 01259 { 01260 save_impl(); 01261 } 01262 01263 void close() 01264 { 01265 if(!dsname_.empty()) 01266 { 01267 save_impl(true); 01268 dsname_.clear(); 01269 } 01270 } 01271 01272 private: 01273 01274 std::string trim_inventory_string(std::string s) const 01275 { 01276 std::string::size_type pos = s.find('1'); 01277 if(pos != std::string::npos) 01278 { 01279 s = s.substr(pos); 01280 } 01281 01282 return s; 01283 } 01284 01285 void save_impl(bool bClose = false) 01286 { 01287 for(typename store_index_t::iterator itr = store_index_.begin() ; itr != store_index_.end() ; ++itr) 01288 { 01289 itr->second->save(); 01290 if(bClose) { itr->second->close(); } 01291 } 01292 01293 pagemap_.save(pagemap_fname_); 01294 } 01295 01296 storeid_t const * get_storeid(page_size_t page_size) 01297 { 01298 page_size = moost::utils::next_power_of_two(page_size); 01299 storeid_t const storeid = moost::utils::msb_set(page_size); 01300 typename store_index_t::const_iterator itr = store_index_.find(storeid); 01301 01302 if(itr == store_index_.end()) 01303 { 01304 store_t store(new Store(page_size)); 01305 store->open(dsname_.c_str(), true); 01306 itr = store_index_.insert(std::make_pair(storeid, store)).first; 01307 01308 store_inventory_.set(storeid); 01309 std::ofstream out(storeinv_fname_.c_str()); 01310 out << trim_inventory_string(store_inventory_.to_string()); 01311 } 01312 01313 return (itr == store_index_.end()) ? 0 : &itr->first; 01314 } 01315 01316 store_t get_store(storeid_t const id) const 01317 { 01318 typename store_index_t::const_iterator itr = store_index_.find(id); 01319 01320 assert(itr != store_index_.end() && itr->second); 01321 01322 if(itr == store_index_.end() || !itr->second) 01323 { 01324 throw std::runtime_error("error retrieving store"); 01325 } 01326 01327 return itr->second; 01328 } 01329 01330 public: 01333 store_type & get_store() { return pagemap_; } 01334 01335 public: // IKvds interface implementation 01336 01338 bool put( 01339 void const * pkey, size_t const ksize, 01340 void const * pval, size_t const vsize 01341 ) 01342 { 01343 bool ok = false; 01344 01345 storeid_t const * pstoreid = get_storeid(vsize); 01346 01347 if(pstoreid) 01348 { 01349 typename pagemap_t::iterator itr = pagemap_.find(pkey, ksize); 01350 01351 if(itr != pagemap_.end() && (itr->second.first != *pstoreid)) 01352 { 01353 get_store(itr->second.first)->erase(itr->second.second); 01354 pagemap_.erase(itr); 01355 itr = pagemap_.end(); 01356 } 01357 01358 store_t store = get_store(*pstoreid); 01359 itemid_t itemid = (itr == pagemap_.end()) ? store->get_next_free_id() : itr->second.second; 01360 store->write(pval, vsize, itemid); 01361 01362 if(itr == pagemap_.end()) 01363 { 01364 pagemap_.insert(pkey, ksize, std::make_pair(*pstoreid, itemid)); 01365 } 01366 01367 ok = true; 01368 } 01369 01370 return ok; 01371 } 01372 01373 bool get( 01374 void const * pkey, size_t const ksize, 01375 void * pval, size_t & vsize 01376 ) 01377 { 01378 bool found = false; 01379 01380 typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize); 01381 01382 page_size_t esize = 0; 01383 if(itr != pagemap_.end()) 01384 { 01385 store_t store = get_store(itr->second.first); 01386 01387 if(store->size(itr->second.second, esize)) 01388 { 01389 vsize = std::min(vsize, esize); 01390 found = store->read(pval, vsize, itr->second.second); 01391 } 01392 } 01393 01394 if(!found) { vsize = 0; } 01395 01396 return found; 01397 } 01398 01400 bool add( 01401 void const * pkey, size_t const ksize, 01402 void const * pval, size_t const vsize 01403 ) 01404 { 01405 bool ok = false; 01406 01407 page_size_t esize = 0; 01408 page_size_t nsize = vsize; 01409 01410 store_t store; 01411 01412 typename pagemap_t::iterator itr = pagemap_.find(pkey, ksize); 01413 if(itr != pagemap_.end()) 01414 { 01415 store = get_store(itr->second.first); 01416 store->size(itr->second.second, esize); 01417 nsize += esize; 01418 } 01419 01420 storeid_t const * pstoreid = get_storeid(nsize); 01421 01422 if(pstoreid) 01423 { 01424 store_t promoted_store = get_store(*pstoreid); 01425 01426 if(itr != pagemap_.end() && (itr->second.first != *pstoreid)) 01427 { 01428 itemid_t itemid = promoted_store->get_next_free_id(); 01429 01430 byte_array_t buf(esize); 01431 if(store->read(&buf[0], esize, itr->second.second)) 01432 { 01433 promoted_store->write(&buf[0], esize, itemid); 01434 } 01435 01436 store->erase(itr->second.second); 01437 itr->second.first = (*pstoreid); 01438 itr->second.second = itemid; 01439 01440 store = get_store(itr->second.first); 01441 } 01442 01443 if(itr == pagemap_.end()) 01444 { 01445 itr = pagemap_.insert(pkey, ksize, std::make_pair(*pstoreid, promoted_store->get_next_free_id())); 01446 01447 store = get_store(itr->second.first); 01448 } 01449 01450 store->append(pval, vsize, itr->second.second); 01451 01452 ok = true; 01453 } 01454 01455 return ok; 01456 } 01457 01458 bool all( 01459 void const * pkey, size_t const ksize, 01460 void * pval, size_t & vsize 01461 ) 01462 { 01463 bool found = false; 01464 01465 typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize); 01466 01467 page_size_t esize = 0; 01468 if((itr != pagemap_.end()) && (get_store(itr->second.first)->size(itr->second.second, esize))) 01469 { 01470 if(esize <= vsize) 01471 { 01472 found = get( 01473 pkey, ksize, 01474 pval, esize 01475 ); 01476 } 01477 01478 vsize = esize; 01479 } 01480 else { vsize = 0; } 01481 01482 return found; 01483 } 01484 01485 bool xst( 01486 void const * pkey, size_t const ksize 01487 ) 01488 { 01489 bool found = false; 01490 01491 typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize); 01492 if(itr != pagemap_.end()) 01493 { 01494 found = get_store(itr->second.first)->exists(itr->second.second); 01495 } 01496 01497 return found; 01498 } 01499 01500 bool del( 01501 void const * pkey, size_t const ksize 01502 ) 01503 { 01504 bool found = false; 01505 01506 typename pagemap_t::iterator itr = pagemap_.find(pkey, ksize); 01507 if(itr != pagemap_.end()) 01508 { 01509 get_store(itr->second.first)->erase(itr->second.second); 01510 pagemap_.erase(itr); 01511 found = true; 01512 } 01513 01514 return found; 01515 } 01516 01517 bool clr() 01518 { 01519 // Truncates all filestores and indexes (this CANNOT be undone!) 01520 01521 for(typename store_index_t::const_iterator itr = store_index_.begin() ; itr != store_index_.end() ; ++itr) 01522 { 01523 itr->second->clear(); 01524 } 01525 01526 pagemap_.clear(); 01527 01528 { std::ofstream(pagemap_fname_.c_str(), std::ios::binary | std::ios::trunc); } 01529 01530 return true; 01531 } 01532 01533 bool beg() 01534 { 01535 itr_ = pagemap_.begin(); 01536 return iterating_ = true; 01537 } 01538 01539 bool nxt( 01540 void * pkey, size_t & ksize 01541 ) 01542 { 01543 bool found = false; 01544 01545 if(iterating_ && (itr_ != pagemap_.end())) 01546 { 01547 found = pagemap_.itr2key(itr_, pkey, ksize); 01548 if(found) { ++itr_; } 01549 } 01550 else 01551 { 01552 ksize = 0; 01553 iterating_ = false; 01554 } 01555 01556 return found; 01557 } 01558 01559 bool end() 01560 { 01561 return iterating_ ? itr_ == pagemap_.end() : true; 01562 } 01563 01564 bool siz( 01565 void const * pkey, size_t const ksize, 01566 size_t & vsize 01567 ) 01568 01569 { 01570 bool found = false; 01571 01572 typename pagemap_t::const_iterator itr = pagemap_.find(pkey, ksize); 01573 01574 if(itr != pagemap_.end()) 01575 { 01576 found = get_store(itr->second.first)->size(itr->second.second, vsize); 01577 } 01578 01579 return found; 01580 } 01581 01582 bool cnt(boost::uint64_t & cnt) 01583 { 01584 cnt = pagemap_.size(); 01585 return true; 01586 } 01587 01588 bool nil(bool & isnil) 01589 { 01590 isnil = pagemap_.empty(); 01591 return true; 01592 } 01593 01594 private: 01595 std::string dsname_; 01596 std::string pagemap_fname_; 01597 std::string storeinv_fname_; 01598 pagemap_t pagemap_; 01599 typename pagemap_t::const_iterator itr_; 01600 store_index_t store_index_; 01601 store_inventory_t store_inventory_; 01602 bool iterating_; 01603 }; 01604 01605 }} 01606 01607 #endif // MOOST_KVDS_KVDS_PAGESTORE_HPP__