cow_file/src/lib.rs

660 lines
18 KiB
Rust

#![allow(unused)]
use std::{
collections::{BTreeMap, VecDeque},
fmt::Debug,
fs::File,
hash::Hash,
marker::PhantomData,
mem::size_of,
ops::Range,
sync::Arc,
};
macro_rules! field_ptr {
($base_ptr: expr, $type: ty, $field: ident) => {{
let base: FilePointer<$type> = $base_ptr;
let res = FilePointer::new(base.into_raw() + offset_of!($type, $field) as u64);
if false {
let ptr: Box<$type> = <$type as ::zerocopy::FromZeroes>::new_box_zeroed();
let mut addr = ::core::ptr::addr_of!(ptr.$field);
addr = res.typed_null_ptr();
}
res
}};
}
mod allocator;
mod atomic_arc;
mod mapped;
mod transaction;
#[cfg(test)]
mod tests;
use allocator::{AllocatorState, GeneralPurposeAllocator, SlabListPointer, SlabPointer};
use atomic_arc::AtomicArc;
use memmap::{Mmap, MmapMut};
use memoffset::offset_of;
use transaction::TransactionHandle;
use zerocopy::{AsBytes, FromBytes, FromZeroes, Ref, Unaligned, LE};
const PAGE_SIZE: u64 = 4096;
type U64 = zerocopy::byteorder::U64<LE>;
type U32 = zerocopy::byteorder::U32<LE>;
type U16 = zerocopy::byteorder::U16<LE>;
#[derive(FromBytes, FromZeroes, AsBytes, Unaligned)]
#[repr(transparent)]
pub struct FilePointer<T> {
inner: RawFilePointer,
_phantom: PhantomData<*const T>,
}
impl<T> Clone for FilePointer<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner,
_phantom: PhantomData,
}
}
}
impl<T> Copy for FilePointer<T> {}
impl<T> Debug for FilePointer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}
impl<T> PartialOrd for FilePointer<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.inner.partial_cmp(&other.inner)
}
}
impl<T> Ord for FilePointer<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.inner.cmp(&other.inner)
}
}
impl<T> PartialEq for FilePointer<T> {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}
impl<T> Eq for FilePointer<T> {}
impl<T> Hash for FilePointer<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.hash(state);
}
}
impl<T> FilePointer<T> {
fn from_range(range: FileRange) -> Self {
assert_eq!(range.len(), size_of::<T>() as u64);
Self::new(range.start)
}
}
impl<T> FilePointer<T> {
pub fn new(inner: RawFilePointer) -> Self {
Self {
inner,
_phantom: PhantomData,
}
}
pub fn null() -> Self {
Self::new(RawFilePointer::null())
}
pub fn is_null(self) -> bool {
self.inner.is_null()
}
pub fn range(self) -> FileRange {
self.inner.range(size_of::<T>() as u64)
}
pub fn into_raw(self) -> RawFilePointer {
self.inner
}
pub fn cast<U>(self) -> FilePointer<U> {
FilePointer::new(self.into_raw())
}
#[doc(hidden)]
pub fn typed_null_ptr(self) -> *const T {
std::ptr::null()
}
}
#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, Hash, PartialEq, Eq)]
#[repr(transparent)]
pub struct RawFilePointer(U64);
impl PartialOrd for RawFilePointer {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.0.get().partial_cmp(&other.0.get())
}
}
impl Ord for RawFilePointer {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.get().cmp(&other.0.get())
}
}
impl Debug for RawFilePointer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "0x{:x}", self.0.get())
}
}
impl RawFilePointer {
fn page(self) -> PagePointer {
PagePointer(u32::try_from(self.0.get() / PAGE_SIZE).unwrap().into())
}
fn page_offset(self) -> (PagePointer, u16) {
(self.page(), (self.0.get() % PAGE_SIZE) as u16)
}
fn from_page_and_offset(page: PagePointer, offset: u16) -> Self {
debug_assert!(
offset < PAGE_SIZE as u16,
"offset 0x{offset:x} out of page bounds (0..0x{PAGE_SIZE:x})"
);
page.start() + offset as u64
}
fn null() -> Self {
Self(U64::ZERO)
}
fn is_null(self) -> bool {
self == Self::null()
}
}
#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, Debug, PartialEq, Eq)]
#[repr(transparent)]
pub struct PagePointer(U32);
impl PartialOrd for PagePointer {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.0.get().partial_cmp(&other.0.get())
}
}
impl Ord for PagePointer {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.get().cmp(&other.0.get())
}
}
impl PagePointer {
fn start(self) -> RawFilePointer {
RawFilePointer((self.0.get() as u64 * PAGE_SIZE).into())
}
fn range(self) -> FileRange {
self.start().range(PAGE_SIZE)
}
fn nth(n: u32) -> Self {
Self(n.into())
}
fn null() -> Self {
Self::nth(0)
}
fn is_null(self) -> bool {
self == Self::null()
}
}
#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, PartialEq, Eq)]
#[repr(C)]
pub struct FileRange {
start: RawFilePointer,
len: U64,
}
impl Debug for FileRange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}..{:?}", self.start, self.start + self.len.get())
}
}
impl std::ops::Add<u64> for RawFilePointer {
type Output = Self;
fn add(self, rhs: u64) -> Self::Output {
Self((self.0.get() + rhs).into())
}
}
impl RawFilePointer {
pub fn range(&self, len: u64) -> FileRange {
FileRange {
start: *self,
len: U64::new(len),
}
}
}
impl FileRange {
fn start(self) -> RawFilePointer {
self.start
}
fn end(self) -> RawFilePointer {
self.start + (self.len() - 1)
}
fn len(&self) -> u64 {
self.len.get()
}
fn as_range(&self) -> Range<usize> {
let start = self.start.0.get().try_into().unwrap();
start..start + usize::try_from(self.len()).unwrap()
}
}
#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)]
#[repr(C)]
struct Header {
magic: [u8; 16],
root: RawFilePointer, // if we make this FilePointer<R>, as it should be, AsBytes can not be derived
allocator_state: AllocatorState,
}
impl Default for Header {
fn default() -> Self {
Self {
magic: *b"cool db format 1",
root: RawFilePointer::null(),
allocator_state: AllocatorState {
general: FilePointer::null(),
slabs: SlabListPointer(FilePointer::new(
RawFilePointer::null() + size_of::<Header>() as u64,
)),
},
}
}
}
struct Snapshot<R> {
root: FilePointer<R>,
map: Mmap,
}
impl<R> Snapshot<R> {
fn read<T: FromBytes>(&self, at: FilePointer<T>) -> &T {
self.read_range(at.range())
}
fn read_range<T: FromBytes>(&self, range: FileRange) -> &T {
Ref::<_, T>::new(&self.map[range.as_range()])
.unwrap()
.into_ref()
}
fn read_raw(&self, range: FileRange) -> &[u8] {
&self.map[range.as_range()]
}
}
pub struct Reader<R> {
state: Arc<AtomicArc<Snapshot<R>>>,
}
impl<R> Reader<R> {
fn get(&self) -> Arc<Snapshot<R>> {
self.state.get()
}
}
pub struct Db<R> {
file: File,
map: MmapMut,
slabs: BTreeMap<u32, SlabPointer>,
state: Arc<AtomicArc<Snapshot<R>>>,
snapshots: VecDeque<SnapshotAndFreeList<R>>,
_phantom: PhantomData<R>,
}
struct SnapshotAndFreeList<R> {
snapshot: Arc<Snapshot<R>>,
to_free: Vec<FileRange>,
}
impl FilePointer<Header> {
fn root_ptr<R>(self) -> FilePointer<FilePointer<R>> {
field_ptr!(self, Header, root).cast::<FilePointer<R>>()
}
fn allocator_state_ptr(self) -> FilePointer<AllocatorState> {
field_ptr!(self, Header, allocator_state)
}
}
impl<R> Db<R> {
fn root(&self) -> FilePointer<R> {
FilePointer::new(self.header().root)
}
fn header_ptr() -> FilePointer<Header> {
FilePointer::null()
}
fn general_purpose_allocator() -> GeneralPurposeAllocator {
GeneralPurposeAllocator {
head_ptr: Self::header_ptr().allocator_state_ptr().general_ptr(),
}
}
fn header(&self) -> &Header {
unsafe { self.reference_range_unchecked(Self::header_ptr().range()) }
}
fn header_mut(&mut self) -> &mut Header {
unsafe { self.modify_range_unchecked(Self::header_ptr().range()) }
}
// NOTE: only allowed before any data of `size` has been allocated
fn add_slab(&mut self, size: u32) -> SlabPointer {
let allocator_state = self.header().allocator_state;
let slab = allocator_state.slabs.add_slab(self, size);
assert!(self.slabs.insert(size, slab).is_none());
slab
}
// NOTE: only allowed before any data of `size` has been allocated
fn ensure_slab(&mut self, size: u32) -> SlabPointer {
self.slabs
.get(&size)
.copied()
.unwrap_or_else(|| self.add_slab(size))
}
fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle<R>) -> FilePointer<R>) {
let mut handle = TransactionHandle::new(self);
let root = f(&mut handle);
let to_free = handle.to_free();
let snapshot = self.update_root(root);
self.snapshots
.push_back(SnapshotAndFreeList { snapshot, to_free });
}
fn free_old_epochs(&mut self) {
let mut snapshots = std::mem::take(&mut self.snapshots);
while snapshots
.front_mut()
.is_some_and(|snapshot| Arc::get_mut(&mut snapshot.snapshot).is_some())
{
// if the snapshot is uniqe we are the only owner and can free the epoch.
// println!("freeing epoch");
let snapshot = snapshots.pop_front().unwrap();
for allocation in snapshot.to_free {
self.free(allocation);
}
}
self.snapshots = snapshots;
}
pub fn create_reader(&self) -> Reader<R> {
Reader {
state: self.state.clone(),
}
}
fn update_root(&mut self, new_root: FilePointer<R>) -> Arc<Snapshot<R>> {
// TODO: we could write some here + flush here for better consistency
// e.g. a copy of the new root pointer
// flush all data in file
self.map.flush().unwrap();
// update root pointer and immediately flush
unsafe {
self.write(Self::header_ptr().root_ptr(), new_root);
}
self.map
.flush_range(
Self::header_ptr().root_ptr::<R>().into_raw().0.get() as usize,
size_of::<RawFilePointer>(),
)
.unwrap();
// update data that readers see
self.state.swap(Arc::new(Snapshot {
root: new_root,
map: self.create_readonly_map(),
}))
}
// TODO: fix pls
#[track_caller]
unsafe fn copy_nonoverlapping(&mut self, from: FileRange, to: FileRange) {
let len = from.len();
// println!("copy from {from:?} to {to:?} ({len})",);
// intervals must be non-overlapping and of the same size
assert!(!from.as_range().contains(&(to.start().0.get() as usize)));
assert!(!from.as_range().contains(&(to.end().0.get() as usize)));
assert!(!to.as_range().contains(&(from.start().0.get() as usize)));
assert!(!to.as_range().contains(&(from.end().0.get() as usize)));
assert_eq!(from.len(), to.len());
if from.start > to.start {
let (head, tail) = self.map.split_at_mut(from.start.0.get() as usize);
head[to.as_range()].copy_from_slice(&tail[0..len as usize]);
} else {
let (head, tail) = self.map.split_at_mut(to.start.0.get() as usize);
tail[0..len as usize].copy_from_slice(&head[from.as_range()]);
}
}
#[track_caller]
unsafe fn read<T: FromBytes>(&self, at: FilePointer<T>) -> T {
self.read_range(at.range())
}
#[track_caller]
unsafe fn read_range<T: FromBytes>(&self, range: FileRange) -> T {
assert!(!range.start.is_null(), "null pointer dereference");
Ref::<_, T>::new(&self.map[range.as_range()])
.unwrap()
.read()
}
#[track_caller]
unsafe fn write<T: AsBytes>(&mut self, at: FilePointer<T>, data: T) {
self.write_range(at.range(), data)
}
#[track_caller]
unsafe fn write_range<T: AsBytes>(&mut self, range: FileRange, data: T) {
assert!(!range.start.is_null(), "null pointer dereference");
Ref::<_, T>::new(&mut self.map[range.as_range()])
.unwrap()
.write(data)
}
#[track_caller]
unsafe fn modify<T: FromBytes + AsBytes>(&mut self, at: FilePointer<T>) -> &mut T {
self.modify_range(at.range())
}
#[track_caller]
unsafe fn modify_range<T: FromBytes + AsBytes>(&mut self, range: FileRange) -> &mut T {
assert!(!range.start.is_null(), "null pointer dereference");
self.modify_range_unchecked(range)
}
unsafe fn modify_range_unchecked<T: FromBytes + AsBytes>(
&mut self,
range: FileRange,
) -> &mut T {
Ref::<_, T>::new(&mut self.map[range.as_range()])
.unwrap()
.into_mut()
}
#[track_caller]
unsafe fn reference<T: FromBytes>(&self, at: FilePointer<T>) -> &T {
self.reference_range(at.range())
}
#[track_caller]
unsafe fn reference_range<T: FromBytes>(&self, range: FileRange) -> &T {
assert!(!range.start.is_null(), "null pointer dereference");
self.reference_range_unchecked(range)
}
unsafe fn reference_range_unchecked<T: FromBytes>(&self, range: FileRange) -> &T {
Ref::<_, T>::new(&self.map[range.as_range()])
.unwrap()
.into_ref()
}
fn remap(&mut self) {
let map = unsafe { MmapMut::map_mut(&self.file) }.unwrap();
self.map = map;
}
fn create_readonly_map(&self) -> Mmap {
unsafe { Mmap::map(&self.file) }.unwrap()
}
fn add_pages(&mut self, n: u64) -> PagePointer {
// println!("adding {n} page{}", if n == 1 { "" } else { "s" });
let len = self.file.metadata().unwrap().len();
self.file.set_len(len + PAGE_SIZE * n).unwrap();
self.remap();
PagePointer::nth((len / PAGE_SIZE).try_into().unwrap())
}
fn remove_pages(&mut self, n: u64) {
let len = self.file.metadata().unwrap().len();
self.file
.set_len(len.checked_sub(PAGE_SIZE * n).unwrap())
.unwrap();
self.remap();
}
pub fn create(file: File, slabs: &[u32]) -> Self {
// clear file
file.set_len(0).unwrap();
file.set_len(PAGE_SIZE).unwrap();
let map = unsafe { MmapMut::map_mut(&file) }.unwrap();
let mut db = Self {
state: Arc::new(AtomicArc::new(Arc::new(Snapshot {
root: FilePointer::null(),
map: unsafe { Mmap::map(&file).unwrap() },
}))),
file,
map,
slabs: BTreeMap::new(),
snapshots: VecDeque::new(),
_phantom: PhantomData,
};
unsafe {
*db.header_mut() = Header::default();
db.init_allocator(slabs);
}
let _ = db.state.swap(Arc::new(Snapshot {
root: db.root(),
map: unsafe { Mmap::map(&db.file).unwrap() },
}));
db
}
pub fn open(file: File) -> Self {
let map = unsafe { MmapMut::map_mut(&file) }.unwrap();
let mut db = Self {
state: Arc::new(AtomicArc::new(Arc::new(Snapshot {
root: FilePointer::null(),
map: unsafe { Mmap::map(&file).unwrap() },
}))),
file,
map,
slabs: BTreeMap::new(),
snapshots: VecDeque::new(),
_phantom: PhantomData,
};
let _ = db.state.swap(Arc::new(Snapshot {
root: db.root(),
map: unsafe { Mmap::map(&db.file).unwrap() },
}));
db
}
unsafe fn init_allocator(&mut self, slabs: &[u32]) {
let allocator_state = self.header().allocator_state;
allocator_state.slabs.init(
self,
(PAGE_SIZE - size_of::<Header>() as u64).try_into().unwrap(),
);
for &size in slabs {
self.ensure_slab(size);
}
}
fn end_of_file(&self) -> RawFilePointer {
RawFilePointer::null() + self.file.metadata().unwrap().len()
}
fn get_slab(&self, size: u64) -> Option<SlabPointer> {
u32::try_from(size)
.ok()
.and_then(|size| self.slabs.get(&size))
.copied()
}
// TODO: scrap the PAGE-wise allocation and make slab allocations allocations of the general allocator.
pub fn allocate(&mut self, size: u64) -> FileRange {
if size == 0 {
return RawFilePointer::null().range(0);
}
if let Some(slab) = self.get_slab(size) {
slab.alloc(self)
} else {
Self::general_purpose_allocator().allocate(self, size)
}
}
pub fn free(&mut self, range: FileRange) {
if range.len() == 0 {
return;
}
if let Some(slab) = self.get_slab(range.len()) {
slab.free(self, range)
} else {
Self::general_purpose_allocator().free(self, range)
}
}
}