diff --git a/src/allocator.rs b/src/allocator.rs index f638248..9afe9a5 100644 --- a/src/allocator.rs +++ b/src/allocator.rs @@ -17,10 +17,10 @@ impl SlabKind { Self::SingleBytes } else if size < size_of::() as u32 { Self::RelativeFreeList - } else if (size as u64) <= PAGE_SIZE { + } else if (size as u64) <= PAGE_SIZE / 2 { // TODO - // slabs of really big object are very inefficient. - // find a better way/ allocate more pages at once e.g. at least 10 elements? + // slabs of really big objects are very inefficient. + // find a better way/ allocate e.g. more pages at once or at least 10 elements? Self::AbsoluteFreeList } else { panic!("invalid size") diff --git a/src/lib.rs b/src/lib.rs index 41e83e1..b7c393b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,6 +151,22 @@ struct Snapshot { map: Mmap, } +impl Snapshot { + fn read(&self, at: FilePointer) -> &T { + self.read_range(at.range(size_of::() as u64)) + } + + fn read_range(&self, range: FileRange) -> &T { + LayoutVerified::<_, T>::new(&self.map[range.as_range()]) + .unwrap() + .into_ref() + } + + fn read_raw(&self, range: FileRange) -> &[u8] { + &self.map[range.as_range()] + } +} + pub struct Reader { state: Arc>, } @@ -160,12 +176,18 @@ pub struct Db { map: MmapMut, slabs: BTreeMap, state: Arc>, + snapshots: Vec, +} + +struct SnapshotAndFreeList { + snapshot: Arc, + to_free: Vec, } #[derive(Clone, Copy)] struct Replaced { from: FileRange, - to: Option, + to: FileRange, } pub struct TransactionHandle<'t> { @@ -175,35 +197,23 @@ pub struct TransactionHandle<'t> { } impl<'t> TransactionHandle<'t> { - unsafe fn get_data(&mut self, range: FileRange) -> &mut [u8] { - &mut self.db.map[range.as_range()] - } - - pub unsafe fn modify(&mut self, range: FileRange) -> (FileRange, &mut [u8]) { - let new = if let Some(&replaced) = self.replaced.get(&range.start) { + pub unsafe fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) { + let range = if let Some(&replaced) = self.replaced.get(&range.start) { assert_eq!(replaced.from, range); - - if let Some(to) = replaced.to { - assert_eq!(to, range); - to - } else { - let (to, _) = self.allocate(range.len()); - - self.replaced.get_mut(&range.start).unwrap().to = Some(to); - - to - } + replaced.to } else if let Some(&new) = self.new.get(&range.start) { assert_eq!(new, range); new } else { - let (new, _) = self.allocate(range.len()); + let (new, _) = self.allocate_raw(range.len()); + + self.db.copy(range, new); let res = self.replaced.insert( new.start, Replaced { from: range, - to: Some(new), + to: new, }, ); debug_assert!(res.is_none()); @@ -211,28 +221,86 @@ impl<'t> TransactionHandle<'t> { new }; - (new, self.get_data(new)) + (range, &mut self.db.map[range.as_range()]) } - pub fn allocate(&mut self, length: u64) -> (FileRange, &mut [u8]) { - unsafe { - let new = self.allocate_range(length); + pub fn read_raw(&mut self, range: FileRange) -> (FileRange, &[u8]) { + let range = if let Some(&replaced) = self.replaced.get(&range.start) { + assert_eq!(replaced.from, range); + replaced.to + } else if let Some(&new) = self.new.get(&range.start) { + assert_eq!(new, range); + new + } else { + range + }; - let res = self.new.insert(new.start, new); + (range, &self.db.map[range.as_range()]) + } + + pub fn allocate_raw(&mut self, length: u64) -> (FileRange, &mut [u8]) { + unsafe { + let range = self.allocate_range(length); + + let res = self.new.insert(range.start, range); debug_assert!(res.is_none()); - (new, self.get_data(new)) + (range, &mut self.db.map[range.as_range()]) + } + } + + pub fn modify_range( + &mut self, + range: FileRange, + ) -> (FileRange, &mut T) { + unsafe { + let (ptr, _) = self.modify_raw(range); + (ptr, self.db.modify_range(ptr)) + } + } + + pub fn modify(&mut self, at: FilePointer) -> (FileRange, &mut T) { + self.modify_range(at.range(size_of::() as u64)) + } + + pub fn read_range(&mut self, range: FileRange) -> (FileRange, &T) { + unsafe { + let (ptr, _) = self.read_raw(range); + (ptr, self.db.reference_range(ptr)) + } + } + + pub fn read(&mut self, at: FilePointer) -> (FileRange, &T) { + unsafe { + let (ptr, _) = self.read_raw(at.range(size_of::() as u64)); + (ptr, self.db.reference_range(ptr)) + } + } + + pub fn allocate_size(&mut self, length: u64) -> (FileRange, &mut T) { + unsafe { + let (ptr, _) = self.allocate_raw(length); + (ptr, self.db.modify_range(ptr)) + } + } + + pub fn allocate(&mut self) -> (FileRange, &mut T) { + unsafe { + let (ptr, _) = self.allocate_raw(size_of::() as u64); + (ptr, self.db.modify_range(ptr)) } } pub fn free(&mut self, range: FileRange) { - // + self.db.free(range) } - fn allocate_range(&mut self, length: u64) -> FileRange { - let range: FileRange = todo!(); + fn allocate_range(&mut self, size: u64) -> FileRange { + self.db.allocate(size) + } - range + fn root(&self) -> FilePointer { + self.db.header().root } } @@ -263,8 +331,45 @@ impl Db { .unwrap_or_else(|| self.add_slab(size)) } - fn transaction(f: impl FnOnce(TransactionHandle)) { - // let handle = TransactionHandle {}; + fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) { + let mut handle = TransactionHandle { + db: self, + replaced: HashMap::new(), + new: HashMap::new(), + }; + + let root = f(&mut handle); + + let to_free = handle + .replaced + .values() + .map(|replaced| replaced.from) + .collect(); + + let snapshot = self.update_root(root); + + self.snapshots + .push(SnapshotAndFreeList { snapshot, to_free }); + } + + fn free_old_epochs(&mut self) { + let mut snapshots = std::mem::take(&mut self.snapshots); + + snapshots.retain_mut(|snapshot| { + if Arc::get_mut(&mut snapshot.snapshot).is_some() { + println!("freeing epoch"); + // if the snapshot is uniqe we are the only owner and can + // free the epoch. + for allocation in std::mem::take(&mut snapshot.to_free) { + self.free(allocation); + } + false + } else { + true + } + }); + + self.snapshots = snapshots; } fn header_ptr() -> FilePointer { @@ -314,6 +419,13 @@ impl Db { })) } + // TODO: fix pls + #[track_caller] + unsafe fn copy(&mut self, from: FileRange, to: FileRange) { + let (head, tail) = self.map.split_at_mut(to.start.0.get() as usize); + tail[0..to.len() as usize].copy_from_slice(&head[from.as_range()]); + } + #[track_caller] unsafe fn read(&self, at: FilePointer) -> T { self.read_range(at.range(size_of::() as u64)) @@ -394,6 +506,15 @@ impl Db { PagePointer::nth((len / PAGE_SIZE).try_into().unwrap()) } + fn remove_pages(&mut self, n: u64) { + let len = self.file.metadata().unwrap().len(); + self.file + .set_len(len.checked_sub(PAGE_SIZE * n).unwrap()) + .unwrap(); + + self.remap(); + } + pub fn create(file: File, slabs: &[u32]) -> Self { // clear file file.set_len(0).unwrap(); @@ -409,6 +530,7 @@ impl Db { file, map, slabs: BTreeMap::new(), + snapshots: Vec::new(), }; unsafe { @@ -435,6 +557,7 @@ impl Db { file, map, slabs: BTreeMap::new(), + snapshots: Vec::new(), }; let _ = db.state.swap(Arc::new(Snapshot { @@ -468,6 +591,7 @@ impl Db { .copied() } + // TODO: scrap the PAGE-wise allocation and make slab allocations allocations of the general allocator. pub fn allocate(&mut self, size: u64) -> FileRange { if let Some(slab) = self.get_slab(size) { slab.alloc(self) @@ -652,32 +776,119 @@ mod tests { #[test] fn it_works() { - let mut db = Db::create(tempfile::tempfile().unwrap(), &[4, 16]); + let mut db = Db::create(tempfile::tempfile().unwrap(), &[4, 7, 16]); - fn alloc_and_free_many(db: &mut Db, n: u64) { - let mut ranges = Vec::new(); - for i in 1..n { - let range = db.allocate(N as u64); + let mut ranges = Vec::new(); + for i in 0..10000 { + ranges.push(db.allocate(i % 32)); + } - let data = [0; N].map(|_| i as u8); + let n = ranges.len(); + for range in ranges.drain(n / 4..n * 3 / 4) { + db.free(range); + } - unsafe { - db.write_range(range, data); - } + for i in 0..10000 { + ranges.push(db.allocate((4 * i) % 32)); + } - ranges.push(range); + for range in ranges.drain(..) { + db.free(range); + } + + // hexdump(db.map.as_bytes()); + } + + #[test] + fn transactions_work() { + #[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)] + #[repr(C)] + struct DataHeader { + generation: U64, + list: FilePointer, + } + + #[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)] + #[repr(C)] + struct DataList { + next: FilePointer, + data: U64, + } + + let mut db = Db::create( + tempfile::tempfile().unwrap(), + &[size_of::() as u32, size_of::() as u32], + ); + + let mut snapshots = Vec::new(); + + for i in 0..10 { + db.transaction(|transaction| { + let root = transaction.root(); + + let root = if root.is_null() { + let (root, data) = transaction.allocate::(); + + *data = DataHeader { + generation: 0.into(), + list: FilePointer::null(), + }; + + root.start + } else { + root + }; + + let (root, &data) = transaction.read::(root); + + assert_eq!(data.generation.get(), i); + + let (elem_ptr, element) = transaction.allocate::(); + + element.next = data.list; + element.data = i.into(); + + let (root, data) = transaction.modify::(root.start); + data.list = elem_ptr.start; + data.generation = (i + 1).into(); + root.start + }); + + snapshots.push(db.create_reader().state.get()); + } + + for (i, snapshot) in snapshots.iter().enumerate() { + let root = snapshot.read::(snapshot.root); + + assert_eq!(root.generation.get(), i as u64 + 1); + + let mut items = Vec::new(); + + let mut ptr = root.list; + + while !ptr.is_null() { + let element = snapshot.read::(ptr); + + items.push(element.data.get()); + ptr = element.next; } - for range in ranges.into_iter().rev() { - db.free(range); + assert_eq!(items.len(), i + 1); + for (expected, &is) in items.iter().rev().enumerate() { + assert_eq!(expected as u64, is); } } - // alloc_and_free_many::<1>(&mut db, 3 * PAGE_SIZE); - alloc_and_free_many::<4>(&mut db, PAGE_SIZE / 4 * 3); - alloc_and_free_many::<4>(&mut db, PAGE_SIZE / 4 * 3); - alloc_and_free_many::<16>(&mut db, PAGE_SIZE / 16 * 3); + drop(snapshots); + // hexdump(db.map.as_bytes()); + + db.free_old_epochs(); + + // hexdump(db.map.as_bytes()); + } + + fn hexdump(bytes: &[u8]) { let mut child = std::process::Command::new("hexdump") .arg("-C") .stdin(Stdio::piped()) @@ -687,7 +898,7 @@ mod tests { let mut stdin = child.stdin.take().expect("failed to get stdin"); - stdin.write_all(db.map.as_bytes()).unwrap(); + stdin.write_all(bytes).unwrap(); std::mem::drop(stdin);