libmoost
/home/mhx/git/github/libmoost/include/moost/io/block_store.hpp
Go to the documentation of this file.
00001 /* vim:set ts=3 sw=3 sts=3 et: */
00028 #ifndef MOOST_IO_BLOCK_STORE_HPP__
00029 #define MOOST_IO_BLOCK_STORE_HPP__
00030 
00031 #include <algorithm>
00032 #include <string>
00033 #include <fstream>
00034 #include <stdexcept>
00035 
00036 #include <boost/filesystem/path.hpp>
00037 #include <boost/filesystem/operations.hpp>
00038 
00039 #include <boost/scoped_array.hpp>
00040 #include <boost/thread/mutex.hpp>
00041 
00042 #include "../container/resource_stack.hpp"
00043 
00044 namespace moost { namespace io {
00045 
00049 class block_store
00050 {
00051 private:
00052    moost::container::resource_stack< std::fstream > m_rstreams;
00053    size_t                                           m_block_size;
00054    size_t                                           m_allocated;
00055    std::vector<size_t>                              m_free_list;
00056    boost::mutex                                     m_mutex;
00057 
00060    void free(size_t index)
00061    {
00062       boost::mutex::scoped_lock lock(m_mutex);
00063       if (index >= m_allocated)
00064          return; // weird
00065       if (index == m_allocated - 1)
00066          --m_allocated;
00067       else
00068       {
00069          std::vector<size_t>::iterator it = std::lower_bound(m_free_list.begin(), m_free_list.end(), index);
00070          if (it == m_free_list.end() || *it != index)
00071             m_free_list.insert(it, index);
00072       }
00073    }
00074 
00076    size_t alloc()
00077    {
00078       size_t ret_val;
00079       boost::mutex::scoped_lock lock(m_mutex);
00080       if (m_free_list.empty())
00081          ret_val = m_allocated++;
00082       else
00083       {
00084          ret_val = m_free_list.back();
00085          m_free_list.pop_back();
00086       }
00087       return ret_val;
00088    }
00089 
00091    std::streampos getpos(size_t index)
00092    {
00093       return static_cast<std::streampos>(sizeof(size_t)) + static_cast<std::streampos>( m_block_size ) * static_cast<std::streampos>( index );
00094    }
00095 
00096 public:
00097 
00099    class scoped_block
00100    {
00101    private:
00102       block_store &                                                     m_block_store;
00103       moost::container::resource_stack< std::fstream >::scoped_resource m_rstream;
00104       size_t                                                            m_index;
00105       bool                                                              m_free;
00106    public:
00108       scoped_block(block_store & block_store_)
00109          : m_block_store(block_store_),
00110            m_rstream(block_store_.m_rstreams),
00111            m_free(false)
00112       {
00113          m_index = m_block_store.alloc();
00114          std::streampos newPos = m_block_store.getpos(m_index);
00115          m_rstream->seekg( newPos );
00116          m_rstream->seekp( newPos );
00117       }
00119       scoped_block(block_store & block_store_, size_t index)
00120          : m_block_store(block_store_),
00121            m_rstream(block_store_.m_rstreams),
00122            m_index(index),
00123            m_free(false)
00124       {
00125          std::streampos newPos = m_block_store.getpos(m_index);
00126          m_rstream->seekg( newPos );
00127          m_rstream->seekp( newPos );
00128       }
00129       ~scoped_block()
00130       {
00131          if (m_free)
00132             m_block_store.free(m_index);
00133          else
00134             m_rstream->rdbuf()->pubsync();
00135       }
00136       void free()                  { m_free = true; }
00137       std::fstream & operator * () { return m_rstream.operator* (); }
00138       std::fstream * operator ->() { return m_rstream.operator->(); }
00139       size_t index()               { return m_index; }
00140       size_t block_size()          { return m_block_store.block_size(); }
00141    };
00142 
00144    block_store(const std::string & path,
00145       size_t block_size,
00146       size_t num_streams = 8  /* the number of concurrent streams accessing this blocks store */ )
00147       : m_block_size(block_size),
00148         m_allocated(0)
00149    {
00150       if (!boost::filesystem::exists(boost::filesystem::path(path)))
00151          std::fstream(path.c_str(), std::ios::binary | std::ios::out | std::ios::app); // poor man's touch
00152       else
00153       {
00154          std::fstream in(path.c_str(), std::ios::binary | std::ios::out | std::ios::in);
00155          in.read(reinterpret_cast<char *>(&m_allocated), sizeof(size_t));
00156          in.seekg(getpos(m_allocated));
00157          size_t free_list_size = 0;
00158          in.read(reinterpret_cast<char *>(&free_list_size), sizeof(size_t));
00159          m_free_list.resize(free_list_size);
00160          if (free_list_size > 0)
00161             in.read(reinterpret_cast<char *>(&m_free_list[0]), free_list_size * sizeof(size_t));
00162       }
00163       while (num_streams-- != 0)
00164       {
00165          boost::shared_ptr<std::fstream> pTmpStream( new std::fstream(
00166                   path.c_str(),
00167                   std::ios::binary | std::ios::out | std::ios::in));
00168 
00169          if ( ! *pTmpStream )
00170          {
00171             throw std::runtime_error("Cannot open output stream for <" + path + ">!");
00172          }
00173 
00174          m_rstreams.add_resource(pTmpStream);
00175       }
00176    }
00177 
00180    ~block_store()
00181    {
00182       moost::container::resource_stack< std::fstream >::scoped_resource rstream(m_rstreams);
00183       rstream->seekp(0);
00184       rstream->write(reinterpret_cast<const char *>(&m_allocated), sizeof(size_t));
00185       rstream->seekp(getpos(m_allocated));
00186       size_t free_list_size = m_free_list.size();
00187       rstream->write(reinterpret_cast<const char *>(&free_list_size), sizeof(size_t));
00188       if (free_list_size > 0)
00189          rstream->write(reinterpret_cast<const char *>(&m_free_list[0]), free_list_size * sizeof(size_t));
00190       rstream->rdbuf()->pubsync();
00191    }
00192 
00195    size_t allocated()
00196    {
00197       return m_allocated;
00198    }
00199 
00201    size_t block_size()
00202    {
00203       return m_block_size;
00204    }
00205 
00206 };
00207 
00208 }} // moost::io
00209 
00210 #endif // MOOST_IO_BLOCK_STORE_HPP__