#![allow(unused)] use std::{ collections::{BTreeMap, VecDeque}, fmt::Debug, fs::File, marker::PhantomData, mem::size_of, ops::Range, sync::Arc, }; mod allocator; mod atomic_arc; mod mapped; mod transaction; use allocator::{AllocatorState, GeneralPurposeAllocator, SlabListPointer, SlabPointer}; use atomic_arc::AtomicArc; use memmap::{Mmap, MmapMut}; use transaction::TransactionHandle; use zerocopy::{AsBytes, FromBytes, FromZeroes, Ref, Unaligned, LE}; const PAGE_SIZE: u64 = 4096; type U64 = zerocopy::byteorder::U64; type U32 = zerocopy::byteorder::U32; type U16 = zerocopy::byteorder::U16; #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, Hash)] #[repr(transparent)] pub struct FilePointer { inner: RawFilePointer, _phantom: PhantomData<*const T>, } impl Debug for FilePointer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.inner.fmt(f) } } impl PartialOrd for FilePointer { fn partial_cmp(&self, other: &Self) -> Option { self.inner.partial_cmp(&other.inner) } } impl Ord for FilePointer { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.inner.cmp(&other.inner) } } impl PartialEq for FilePointer { fn eq(&self, other: &Self) -> bool { self.inner == other.inner } } impl Eq for FilePointer {} impl FilePointer { fn from_range(range: FileRange) -> Self { assert_eq!(range.len(), size_of::() as u64); Self::new(range.start) } } impl FilePointer { pub fn new(inner: RawFilePointer) -> Self { Self { inner, _phantom: PhantomData, } } pub fn null() -> Self { Self::new(RawFilePointer::null()) } pub fn is_null(self) -> bool { self.inner.is_null() } pub fn range(self) -> FileRange { self.inner.range(size_of::() as u64) } pub fn into_raw(self) -> RawFilePointer { self.inner } } #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, Hash, PartialEq, Eq)] #[repr(transparent)] pub struct RawFilePointer(U64); impl PartialOrd for RawFilePointer { fn partial_cmp(&self, other: &Self) -> Option { self.0.get().partial_cmp(&other.0.get()) } } impl Ord for RawFilePointer { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.0.get().cmp(&other.0.get()) } } impl Debug for RawFilePointer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "0x{:x}", self.0.get()) } } impl RawFilePointer { fn page(self) -> PagePointer { PagePointer(u32::try_from(self.0.get() / PAGE_SIZE).unwrap().into()) } fn page_offset(self) -> (PagePointer, u16) { (self.page(), (self.0.get() % PAGE_SIZE) as u16) } fn from_page_and_offset(page: PagePointer, offset: u16) -> Self { debug_assert!( offset < PAGE_SIZE as u16, "offset 0x{offset:x} out for page bounds (0..0x{PAGE_SIZE:x})" ); page.start() + offset as u64 } fn null() -> Self { Self(U64::ZERO) } fn is_null(self) -> bool { self == Self::null() } } #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, Debug, PartialEq, Eq)] #[repr(transparent)] pub struct PagePointer(U32); impl PagePointer { fn start(self) -> RawFilePointer { RawFilePointer((self.0.get() as u64 * PAGE_SIZE).into()) } fn range(self) -> FileRange { self.start().range(PAGE_SIZE) } fn nth(n: u32) -> Self { Self(n.into()) } fn null() -> Self { Self::nth(0) } } #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned, PartialEq, Eq)] #[repr(C)] pub struct FileRange { start: RawFilePointer, len: U64, } impl Debug for FileRange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}..{:?}", self.start, self.start + self.len.get()) } } impl std::ops::Add for RawFilePointer { type Output = Self; fn add(self, rhs: u64) -> Self::Output { Self((self.0.get() + rhs).into()) } } impl RawFilePointer { pub fn range(&self, len: u64) -> FileRange { FileRange { start: *self, len: U64::new(len), } } } impl FileRange { fn len(&self) -> u64 { self.len.get() } fn as_range(&self) -> Range { let start = self.start.0.get().try_into().unwrap(); start..start + usize::try_from(self.len()).unwrap() } } #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)] #[repr(C)] struct Header { magic: [u8; 16], root: FilePointer, allocator_state: AllocatorState, } impl Default for Header { fn default() -> Self { Self { magic: *b"cool db format 1", root: FilePointer::null(), allocator_state: AllocatorState { general: RawFilePointer::null(), slabs: SlabListPointer(FilePointer::new( RawFilePointer::null() + size_of::>() as u64, )), }, } } } struct Snapshot { root: FilePointer, map: Mmap, } impl Snapshot { fn read(&self, at: FilePointer) -> &T { self.read_range(at.range()) } fn read_range(&self, range: FileRange) -> &T { Ref::<_, T>::new(&self.map[range.as_range()]) .unwrap() .into_ref() } fn read_raw(&self, range: FileRange) -> &[u8] { &self.map[range.as_range()] } } pub struct Reader { state: Arc>>, } pub struct Db { file: File, map: MmapMut, slabs: BTreeMap, state: Arc>>, snapshots: VecDeque>, _phantom: PhantomData, } struct SnapshotAndFreeList { snapshot: Arc>, to_free: Vec, } impl Db { fn header(&self) -> &Header { unsafe { self.reference_range_unchecked(Self::header_ptr().range()) } } fn header_mut(&mut self) -> &mut Header { unsafe { self.modify_range_unchecked(Self::header_ptr().range()) } } // NOTE: only allowed before any data of `size` has been allocated fn add_slab(&mut self, size: u32) -> SlabPointer { let allocator_state = self.header().allocator_state; let slab = allocator_state.slabs.add_slab(self, size); assert!(self.slabs.insert(size, slab).is_none()); slab } // NOTE: only allowed before any data of `size` has been allocated fn ensure_slab(&mut self, size: u32) -> SlabPointer { self.slabs .get(&size) .copied() .unwrap_or_else(|| self.add_slab(size)) } fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) { let mut handle = TransactionHandle::new(self); let root = f(&mut handle); let to_free = handle.to_free(); let snapshot = self.update_root(root); self.snapshots .push_back(SnapshotAndFreeList { snapshot, to_free }); } fn free_old_epochs(&mut self) { let mut snapshots = std::mem::take(&mut self.snapshots); while snapshots .front_mut() .is_some_and(|snapshot| Arc::get_mut(&mut snapshot.snapshot).is_some()) { // if the snapshot is uniqe we are the only owner and can free the epoch. // println!("freeing epoch"); let snapshot = snapshots.pop_front().unwrap(); for allocation in snapshot.to_free { self.free(allocation); } } self.snapshots = snapshots; } fn header_ptr() -> FilePointer> { FilePointer::new(RawFilePointer(0.into())) } fn root_ptr() -> FilePointer> { FilePointer::new(RawFilePointer((size_of::>() as u64).into())) } fn allocator_state_ptr() -> RawFilePointer { RawFilePointer((size_of::>() as u64 + size_of::() as u64).into()) } fn general_purpose_allocator() -> GeneralPurposeAllocator { GeneralPurposeAllocator { head_ptr: FilePointer::new(Self::allocator_state_ptr()), } } pub fn create_reader(&self) -> Reader { Reader { state: self.state.clone(), } } fn update_root(&mut self, new_root: FilePointer) -> Arc> { // TODO: we could write some here + flush here for better consistency // e.g. a copy of the new root pointer // flush all data in file self.map.flush().unwrap(); // update root pointer and immediately flush unsafe { self.write(Self::root_ptr(), new_root); } self.map .flush_range( Self::root_ptr().into_raw().0.get() as usize, size_of::(), ) .unwrap(); // update data that readers see self.state.swap(Arc::new(Snapshot { root: new_root, map: self.create_readonly_map(), })) } // TODO: fix pls #[track_caller] unsafe fn copy(&mut self, from: FileRange, to: FileRange) { let (head, tail) = self.map.split_at_mut(to.start.0.get() as usize); tail[0..to.len() as usize].copy_from_slice(&head[from.as_range()]); } #[track_caller] unsafe fn read(&self, at: FilePointer) -> T { self.read_range(at.range()) } #[track_caller] unsafe fn read_range(&self, range: FileRange) -> T { assert!(!range.start.is_null(), "null pointer dereference"); Ref::<_, T>::new(&self.map[range.as_range()]) .unwrap() .read() } #[track_caller] unsafe fn write(&mut self, at: FilePointer, data: T) { self.write_range(at.range(), data) } #[track_caller] unsafe fn write_range(&mut self, range: FileRange, data: T) { assert!(!range.start.is_null(), "null pointer dereference"); Ref::<_, T>::new(&mut self.map[range.as_range()]) .unwrap() .write(data) } #[track_caller] unsafe fn modify(&mut self, at: FilePointer) -> &mut T { self.modify_range(at.range()) } #[track_caller] unsafe fn modify_range(&mut self, range: FileRange) -> &mut T { assert!(!range.start.is_null(), "null pointer dereference"); self.modify_range_unchecked(range) } unsafe fn modify_range_unchecked( &mut self, range: FileRange, ) -> &mut T { Ref::<_, T>::new(&mut self.map[range.as_range()]) .unwrap() .into_mut() } #[track_caller] unsafe fn reference(&self, at: FilePointer) -> &T { self.reference_range(at.range()) } #[track_caller] unsafe fn reference_range(&self, range: FileRange) -> &T { assert!(!range.start.is_null(), "null pointer dereference"); self.reference_range_unchecked(range) } unsafe fn reference_range_unchecked(&self, range: FileRange) -> &T { Ref::<_, T>::new(&self.map[range.as_range()]) .unwrap() .into_ref() } fn remap(&mut self) { let map = unsafe { MmapMut::map_mut(&self.file) }.unwrap(); self.map = map; } fn create_readonly_map(&self) -> Mmap { unsafe { Mmap::map(&self.file) }.unwrap() } fn add_pages(&mut self, n: u64) -> PagePointer { // println!("adding {n} page{}", if n == 1 { "" } else { "s" }); let len = self.file.metadata().unwrap().len(); self.file.set_len(len + PAGE_SIZE * n).unwrap(); self.remap(); PagePointer::nth((len / PAGE_SIZE).try_into().unwrap()) } fn remove_pages(&mut self, n: u64) { let len = self.file.metadata().unwrap().len(); self.file .set_len(len.checked_sub(PAGE_SIZE * n).unwrap()) .unwrap(); self.remap(); } pub fn create(file: File, slabs: &[u32]) -> Self { // clear file file.set_len(0).unwrap(); file.set_len(PAGE_SIZE).unwrap(); let map = unsafe { MmapMut::map_mut(&file) }.unwrap(); let mut db = Self { state: Arc::new(AtomicArc::new(Arc::new(Snapshot { root: FilePointer::null(), map: unsafe { Mmap::map(&file).unwrap() }, }))), file, map, slabs: BTreeMap::new(), snapshots: VecDeque::new(), _phantom: PhantomData, }; unsafe { *db.header_mut() = Header::default(); db.init_allocator(slabs); } let _ = db.state.swap(Arc::new(Snapshot { root: db.header().root, map: unsafe { Mmap::map(&db.file).unwrap() }, })); db } pub fn open(file: File) -> Self { let map = unsafe { MmapMut::map_mut(&file) }.unwrap(); let mut db = Self { state: Arc::new(AtomicArc::new(Arc::new(Snapshot { root: FilePointer::null(), map: unsafe { Mmap::map(&file).unwrap() }, }))), file, map, slabs: BTreeMap::new(), snapshots: VecDeque::new(), _phantom: PhantomData, }; let _ = db.state.swap(Arc::new(Snapshot { root: db.header().root, map: unsafe { Mmap::map(&db.file).unwrap() }, })); db } unsafe fn init_allocator(&mut self, slabs: &[u32]) { let allocator_state = self.header().allocator_state; allocator_state.slabs.init( self, (PAGE_SIZE - size_of::>() as u64) .try_into() .unwrap(), ); for &size in slabs { self.ensure_slab(size); } } fn end_of_file(&self) -> RawFilePointer { RawFilePointer::null() + self.file.metadata().unwrap().len() } fn get_slab(&self, size: u64) -> Option { u32::try_from(size) .ok() .and_then(|size| self.slabs.get(&size)) .copied() } // TODO: scrap the PAGE-wise allocation and make slab allocations allocations of the general allocator. pub fn allocate(&mut self, size: u64) -> FileRange { if let Some(slab) = self.get_slab(size) { slab.alloc(self) } else { Self::general_purpose_allocator().allocate(self, size) } } pub fn free(&mut self, range: FileRange) { if let Some(slab) = self.get_slab(range.len()) { slab.free(self, range) } else { Self::general_purpose_allocator().free(self, range) } } } #[cfg(test)] mod tests { use super::*; use mapped::ReaderTrait; use std::io::Write; use std::process::Stdio; #[derive(Debug, Clone, Copy)] enum Operation { Allocate { size: u64 }, Free { index: usize }, } fn causes_fragmentation(sequence: &[Operation], print: bool) -> bool { let mut db = Db::<()>::create(tempfile::tempfile().unwrap(), &[]); let allocator = Db::<()>::general_purpose_allocator(); let mut ranges = Vec::new(); for &operation in sequence { match operation { Operation::Allocate { size } => ranges.push(allocator.allocate(&mut db, size)), Operation::Free { index } => { if ranges.get(index).is_some() { allocator.free(&mut db, ranges.remove(index)) } } } } for range in ranges.drain(..) { allocator.free(&mut db, range); } fragmentation(&mut db, print) > 1 } fn fragmentation(db: &mut Db, print: bool) -> usize { let allocator = Db::<()>::general_purpose_allocator(); let mut next = unsafe { db.read(GeneralPurposeAllocator::next_ptr(allocator.head_ptr)) }; let mut n = 0; while !next.is_null() { let size = GeneralPurposeAllocator::size(db, next); if print { println!("\x1b[34m[{n}]\x1b[m {:?}", next.into_raw().range(size)); } next = unsafe { db.read(GeneralPurposeAllocator::next_ptr(next)) }; n += 1; } n } #[test] fn debug_fragmentation() { use Operation::*; #[rustfmt::skip] let mut sequence = vec![Allocate { size: 1946 }, Allocate { size: 3252 }, Free { index: 0 }, Allocate { size: 7391 }, Allocate { size: 394 }, Allocate { size: 3726 }, Allocate { size: 1429 }, Allocate { size: 3188 }, Allocate { size: 6375 }, Allocate { size: 4453 }, Allocate { size: 2514 }, Allocate { size: 4754 }, Allocate { size: 6785 }, Allocate { size: 2751 }, Allocate { size: 4107 }, Allocate { size: 3509 }, Allocate { size: 5897 }, Allocate { size: 7081 }, Allocate { size: 2419 }, Allocate { size: 5400 }, Allocate { size: 7135 }, Free { index: 14 }, Allocate { size: 2130 }, Free { index: 18 }, Allocate { size: 3450 }, Allocate { size: 1296 }, Allocate { size: 8091 }, Allocate { size: 4646 }, Allocate { size: 3891 }, Free { index: 0 }, Allocate { size: 1087 }, Allocate { size: 101 }, Allocate { size: 5353 }, Allocate { size: 3381 }, Allocate { size: 6869 }, Free { index: 1 }, Allocate { size: 3750 }, Allocate { size: 1398 }, Free { index: 22 }, Allocate { size: 18 }, Free { index: 25 }, Allocate { size: 642 }, Free { index: 4 }, Allocate { size: 4 }, Allocate { size: 1898 }, Allocate { size: 5259 }, Free { index: 26 }, Allocate { size: 3151 }, Allocate { size: 4989 }, Allocate { size: 6493 }, Allocate { size: 551 }, Allocate { size: 706 }, Allocate { size: 4161 }, Free { index: 16 }, Allocate { size: 3422 }, Allocate { size: 3011 }, Allocate { size: 5149 }, Allocate { size: 4687 }, Allocate { size: 5 }, Free { index: 34 }, Allocate { size: 191 }, Allocate { size: 2851 }, Allocate { size: 3597 }, Free { index: 28 }, Allocate { size: 7037 }, Allocate { size: 4660 }, Allocate { size: 194 }, Allocate { size: 5537 }, Allocate { size: 3242 }, Allocate { size: 6298 }, Allocate { size: 1239 }, Allocate { size: 7025 }, Allocate { size: 3563 }, Allocate { size: 5039 }, Free { index: 40 }, Allocate { size: 4549 }, Allocate { size: 5362 }, Allocate { size: 3510 }, Free { index: 31 }, Allocate { size: 226 }, Allocate { size: 6904 }, Allocate { size: 4150 }, Allocate { size: 4914 }, Allocate { size: 2330 }, Allocate { size: 2499 }, Allocate { size: 6677 }, Allocate { size: 95 }, Allocate { size: 3726 }, Allocate { size: 3258 }, Free { index: 2 }, Allocate { size: 2129 }, Allocate { size: 3674 }, Allocate { size: 1542 }, Allocate { size: 2210 }, Free { index: 21 }, Allocate { size: 3914 }, Allocate { size: 3108 }, Allocate { size: 1979 }, Allocate { size: 2677 }, Allocate { size: 8140 }, Allocate { size: 7573 }, Allocate { size: 121 }, Free { index: 59 }, Allocate { size: 6467 }, Allocate { size: 262 }, Allocate { size: 7711 }, Allocate { size: 2450 }, Allocate { size: 4351 }, Allocate { size: 4282 }, Free { index: 39 }, Allocate { size: 4050 }, Allocate { size: 67 }, Allocate { size: 5560 }, Free { index: 51 }, Allocate { size: 6038 }, Allocate { size: 555 }, Allocate { size: 1852 }, Free { index: 78 }, Allocate { size: 698 }]; let mut prev_sequence = sequence.clone(); while causes_fragmentation(&sequence, false) { prev_sequence = sequence.clone(); sequence.pop(); } println!("{prev_sequence:?}"); let mut sequence = prev_sequence.clone(); loop { let mut removed_something = false; let mut i = 0; while i < sequence.len() { let mut new_sequence = sequence.clone(); new_sequence.remove(i); if causes_fragmentation(&new_sequence, false) { removed_something = true; println!("removed {i} ({:?})", sequence[i]); sequence = new_sequence; } else { for item in &mut new_sequence { if let Operation::Free { index } = item { if *index > i { *index -= 1; } } } if causes_fragmentation(&new_sequence, false) { removed_something = true; println!("removed {i} ({:?}) after adjusting frees", sequence[i]); sequence = new_sequence; } } i += 1; } if !removed_something { break; } } loop { let mut merged_something = false; let mut i = 0; while i < sequence.len() { let mut new_sequence = sequence.clone(); let removed = new_sequence.remove(i); if let Operation::Allocate { size: removed_size } = removed { if let Some(Operation::Allocate { size }) = new_sequence.get_mut(i) { *size += removed_size; } } if causes_fragmentation(&new_sequence, false) { merged_something = true; println!( "merged {} and {} ({:?} and {:?})", i, i + 1, sequence[i], sequence[i + 1] ); sequence = new_sequence; } else { for item in &mut new_sequence { if let Operation::Free { index } = item { if *index > i { *index -= 1; } } } if causes_fragmentation(&new_sequence, false) { merged_something = true; println!( "merged {} and {} ({:?} and {:?}) after adjusting frees", i, i + 1, sequence[i], sequence[i + 1] ); sequence = new_sequence; } } i += 1; } if !merged_something { break; } } println!("{sequence:?}"); dbg!(causes_fragmentation(&sequence, true)); } #[test] fn it_works() { let mut db = Db::<()>::create(tempfile::tempfile().unwrap(), &[4, 7, 16]); let mut ranges = Vec::new(); for i in 0..10000 { ranges.push(db.allocate(i % 32)); } let n = ranges.len(); for range in ranges.drain(n / 4..n * 3 / 4) { db.free(range); } for i in 0..10000 { ranges.push(db.allocate((4 * i) % 32)); } for range in ranges.drain(..) { db.free(range); } // hexdump(db.map.as_bytes()); } #[test] fn transactions_work() { #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)] #[repr(C)] struct DataHeader { generation: U64, list: FilePointer, } #[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)] #[repr(C)] struct DataList { next: FilePointer, data: U64, } let mut db = Db::::create( tempfile::tempfile().unwrap(), &[size_of::() as u32, size_of::() as u32], ); let mut snapshots = Vec::new(); for i in 0..10 { db.transaction(|transaction| { let root = transaction.root(); let root = if root.is_null() { let (root, data) = transaction.allocate::(); *data = DataHeader { generation: 0.into(), list: FilePointer::null(), }; root } else { root }; let &data = transaction.read::(root); assert_eq!(data.generation.get(), i); let n = { let mut next = data.list; let mut n = 0; while !next.is_null() { next = transaction.read::(next).next; n += 1; } n }; let next = if n >= 5 { transaction.read::(data.list).next } else { data.list }; let (elem_ptr, element) = transaction.allocate::(); element.next = next; element.data = i.into(); let (root, data) = transaction.modify::(root); data.list = elem_ptr; data.generation = (i + 1).into(); root }); snapshots.push(db.create_reader().state.get()); } for (i, snapshot) in snapshots.iter().enumerate() { let root = snapshot.read::(snapshot.root); assert_eq!(root.generation.get(), i as u64 + 1); let mut items = Vec::new(); let mut ptr = root.list; while !ptr.is_null() { let element = snapshot.read::(ptr); items.push(element.data.get()); ptr = element.next; } assert_eq!(items.len(), (i + 1).min(5)); assert_eq!(items[0], i as u64); for (expected, &is) in items.iter().skip(1).rev().enumerate() { assert_eq!(expected as u64, is); } } drop(snapshots); // hexdump(db.map.as_bytes()); db.free_old_epochs(); // hexdump(db.map.as_bytes()); } fn hexdump(bytes: &[u8]) { let mut child = std::process::Command::new("hexdump") .arg("-C") .stdin(Stdio::piped()) .stdout(Stdio::inherit()) .spawn() .unwrap(); let mut stdin = child.stdin.take().expect("failed to get stdin"); stdin.write_all(bytes).unwrap(); std::mem::drop(stdin); child.wait().unwrap(); } }