diff --git a/src/datastructures/btree.rs b/src/datastructures/btree.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/datastructures/btree.rs @@ -0,0 +1 @@ + diff --git a/src/datastructures/mod.rs b/src/datastructures/mod.rs new file mode 100644 index 0000000..ecb60de --- /dev/null +++ b/src/datastructures/mod.rs @@ -0,0 +1,2 @@ +pub mod btree; +pub mod queue; diff --git a/src/datastructures/queue.rs b/src/datastructures/queue.rs new file mode 100644 index 0000000..bdd2da1 --- /dev/null +++ b/src/datastructures/queue.rs @@ -0,0 +1,151 @@ +use std::marker::PhantomData; + +use zerocopy::{AsBytes, FromBytes, FromZeroes, Unaligned}; + +use crate::ReaderTrait; +use crate::{transaction, FilePointer, TransactionHandle, U64}; + +#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)] +#[repr(packed)] +pub struct Queue { + pub(crate) head: FilePointer>, + pub(crate) length: U64, + _phantom: PhantomData, +} + +impl Queue { + pub fn new(transaction: &mut TransactionHandle) -> FilePointer> { + let (queue, data) = transaction.allocate(); + *data = Self { + head: FilePointer::null(), + length: 0.into(), + _phantom: PhantomData, + }; + queue + } +} + +impl FilePointer> { + pub fn enqueue(self, transaction: &mut TransactionHandle, item: T) -> Self { + self.enqueue_many(transaction, &[item]) + } + + pub fn dequeue(self, transaction: &mut TransactionHandle) -> Option<(Self, T)> { + let mut res = None; + self.dequeue_many_inner(transaction, 1, |t| assert!(res.replace(t).is_none())) + .map(|ptr| (ptr, res.unwrap())) + } + + pub fn enqueue_many(self, transaction: &mut TransactionHandle, items: &[T]) -> Self { + if items.is_empty() { + return self; + } + + let old = *transaction.read(self); + + let mut head = old.head; + + for item in items { + let (element, data) = transaction.allocate(); + *data = QueueElement { + next: head, + data: *item, + _phantom: PhantomData, + }; + + head = element; + } + + let (queue, data) = transaction.modify(self); + *data = Queue { + head, + length: (old + .length + .get() + .checked_add(items.len().try_into().unwrap()) + .unwrap()) + .into(), + _phantom: PhantomData, + }; + queue + } + + pub fn dequeue_many( + self, + transaction: &mut TransactionHandle, + n: u64, + ) -> Option<(Self, Vec)> { + let mut res = Vec::with_capacity(n.try_into().unwrap()); + self.dequeue_many_inner(transaction, n, |t| res.push(t)) + .map(|ptr| { + res.reverse(); + (ptr, res) + }) + } + + // NOTE: calls f with the elements in reverse order. + fn dequeue_many_inner( + self, + transaction: &mut TransactionHandle, + n: u64, + mut f: impl FnMut(T), + ) -> Option { + if n == 0 { + return Some(self); + } + + let old = *transaction.read(self); + + if old.length.get() < n { + return None; + } + + let (queue, data) = transaction.modify(self); + + *data = Queue { + head: old.head, + length: (old.length.get().checked_sub(n).unwrap()).into(), + _phantom: PhantomData, + }; + + // println!("length: {}", old.length.get()); + + let mut ptr = data.head; + + let skip_n = old.length.get().checked_sub(n).unwrap(); + + // println!("skip_n: {skip_n}"); + + // skip the head + for i in 0..skip_n { + // println!("skip [{i}] {ptr:?}"); + + ptr = *transaction.read(field_ptr!(ptr, QueueElement, next)); + } + + // remove the tail + for i in 0..n { + // println!("free [{i}] {ptr:?}"); + + let element = *transaction.read(ptr); + transaction.free(ptr); + + f(element.data); + ptr = element.next; + } + + Some(queue) + } + + pub fn length(self, reader: &impl ReaderTrait) -> u64 { + reader.read(field_ptr!(self, Queue, length)).get() + } +} + +#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)] +#[repr(packed)] +pub(crate) struct QueueElement { + pub(crate) next: FilePointer>, + pub(crate) data: T, + _phantom: PhantomData, +} diff --git a/src/lib.rs b/src/lib.rs index 9390df6..c963a7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,7 @@ macro_rules! field_ptr { ($base_ptr: expr, $type: ty, $field: ident) => {{ let base: FilePointer<$type> = $base_ptr; - let res = FilePointer::new(base.into_raw() + offset_of!($type, $field) as u64); + let res = FilePointer::new(base.into_raw() + ::memoffset::offset_of!($type, $field) as u64); if false { let ptr: Box<$type> = <$type as ::zerocopy::FromZeroes>::new_box_zeroed(); @@ -28,17 +28,19 @@ macro_rules! field_ptr { mod allocator; mod atomic_arc; +pub mod datastructures; mod mapped; mod transaction; #[cfg(test)] mod tests; +pub use transaction::TransactionHandle; + use allocator::{AllocatorState, GeneralPurposeAllocator, SlabListPointer, SlabPointer}; use atomic_arc::AtomicArc; +use mapped::ReaderTrait; use memmap::{Mmap, MmapMut}; -use memoffset::offset_of; -use transaction::TransactionHandle; use zerocopy::{AsBytes, FromBytes, FromZeroes, Ref, Unaligned, LE}; const PAGE_SIZE: u64 = 4096; @@ -290,6 +292,12 @@ struct Snapshot { map: Mmap, } +impl ReaderTrait for Snapshot { + fn read_raw(&self, range: FileRange) -> &[u8] { + &self.map[range.as_range()] + } +} + impl Snapshot { fn read(&self, at: FilePointer) -> &T { self.read_range(at.range()) @@ -379,20 +387,24 @@ impl Db { .unwrap_or_else(|| self.add_slab(size)) } - fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) { + pub fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) { let mut handle = TransactionHandle::new(self); let root = f(&mut handle); + // dbg!(&handle); + let to_free = handle.to_free(); + // dbg!(&to_free); + let snapshot = self.update_root(root); self.snapshots .push_back(SnapshotAndFreeList { snapshot, to_free }); } - fn free_old_epochs(&mut self) { + pub fn free_old_epochs(&mut self) { let mut snapshots = std::mem::take(&mut self.snapshots); while snapshots diff --git a/src/mapped.rs b/src/mapped.rs index 9c9db6b..5f35469 100644 --- a/src/mapped.rs +++ b/src/mapped.rs @@ -1,14 +1,21 @@ -use std::mem::size_of; +use std::mem::{align_of, size_of}; use zerocopy::{FromBytes, FromZeroes, Ref}; use crate::{FilePointer, FileRange, RawFilePointer}; pub trait ReaderTrait { - fn read_raw(&self, ptr: FileRange) -> &[u8]; + fn read_raw(&self, range: FileRange) -> &[u8]; fn read(&self, ptr: FilePointer) -> &T { - Ref::<_, T>::new(self.read_raw(ptr.range())) - .unwrap() + let bytes = self.read_raw(ptr.range()); + Ref::<_, T>::new(bytes) + .unwrap_or_else(|| { + panic!( + "failed to interpret {:?} = {bytes:?} as {}", + ptr.range(), + std::any::type_name::() + ) + }) .into_ref() } } diff --git a/src/tests.rs b/src/tests.rs index ed04ab8..2dd56d1 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,7 +1,9 @@ use crate::allocator::{div_round_up, FreeListBlock, RelativeFreeListHeader, SlabKind}; +use crate::datastructures::queue::{self, Queue, QueueElement}; use super::*; use mapped::ReaderTrait; +use rand::Rng; use std::collections::BTreeSet; use std::io::Write; use std::ops::Shl; @@ -238,7 +240,7 @@ fn allocator() { fn no_fragmentation() { use Operation::*; #[rustfmt::skip] - let mut sequence = vec![Allocate { size: 1946 }, Allocate { size: 3252 }, Free { index: 0 }, Allocate { size: 7391 }, Allocate { size: 394 }, Allocate { size: 3726 }, Allocate { size: 1429 }, Allocate { size: 3188 }, Allocate { size: 6375 }, Allocate { size: 4453 }, Allocate { size: 2514 }, Allocate { size: 4754 }, Allocate { size: 6785 }, Allocate { size: 2751 }, Allocate { size: 4107 }, Allocate { size: 3509 }, Allocate { size: 5897 }, Allocate { size: 7081 }, Allocate { size: 2419 }, Allocate { size: 5400 }, Allocate { size: 7135 }, Free { index: 14 }, Allocate { size: 2130 }, Free { index: 18 }, Allocate { size: 3450 }, Allocate { size: 1296 }, Allocate { size: 8091 }, Allocate { size: 4646 }, Allocate { size: 3891 }, Free { index: 0 }, Allocate { size: 1087 }, Allocate { size: 101 }, Allocate { size: 5353 }, Allocate { size: 3381 }, Allocate { size: 6869 }, Free { index: 1 }, Allocate { size: 3750 }, Allocate { size: 1398 }, Free { index: 22 }, Allocate { size: 18 }, Free { index: 25 }, Allocate { size: 642 }, Free { index: 4 }, Allocate { size: 4 }, Allocate { size: 1898 }, Allocate { size: 5259 }, Free { index: 26 }, Allocate { size: 3151 }, Allocate { size: 4989 }, Allocate { size: 6493 }, Allocate { size: 551 }, Allocate { size: 706 }, Allocate { size: 4161 }, Free { index: 16 }, Allocate { size: 3422 }, Allocate { size: 3011 }, Allocate { size: 5149 }, Allocate { size: 4687 }, Allocate { size: 5 }, Free { index: 34 }, Allocate { size: 191 }, Allocate { size: 2851 }, Allocate { size: 3597 }, Free { index: 28 }, Allocate { size: 7037 }, Allocate { size: 4660 }, Allocate { size: 194 }, Allocate { size: 5537 }, Allocate { size: 3242 }, Allocate { size: 6298 }, Allocate { size: 1239 }, Allocate { size: 7025 }, Allocate { size: 3563 }, Allocate { size: 5039 }, Free { index: 40 }, Allocate { size: 4549 }, Allocate { size: 5362 }, Allocate { size: 3510 }, Free { index: 31 }, Allocate { size: 226 }, Allocate { size: 6904 }, Allocate { size: 4150 }, Allocate { size: 4914 }, Allocate { size: 2330 }, Allocate { size: 2499 }, Allocate { size: 6677 }, Allocate { size: 95 }, Allocate { size: 3726 }, Allocate { size: 3258 }, Free { index: 2 }, Allocate { size: 2129 }, Allocate { size: 3674 }, Allocate { size: 1542 }, Allocate { size: 2210 }, Free { index: 21 }, Allocate { size: 3914 }, Allocate { size: 3108 }, Allocate { size: 1979 }, Allocate { size: 2677 }, Allocate { size: 8140 }, Allocate { size: 7573 }, Allocate { size: 121 }, Free { index: 59 }, Allocate { size: 6467 }, Allocate { size: 262 }, Allocate { size: 7711 }, Allocate { size: 2450 }, Allocate { size: 4351 }, Allocate { size: 4282 }, Free { index: 39 }, Allocate { size: 4050 }, Allocate { size: 67 }, Allocate { size: 5560 }, Free { index: 51 }, Allocate { size: 6038 }, Allocate { size: 555 }, Allocate { size: 1852 }, Free { index: 78 }, Allocate { size: 698 }]; + let mut sequence = vec![Allocate { size: 1946 }, Allocate { size: 3252 }, Free { index: 0 }, Allocate { size: 7391 }, Allocate { size: 394 }, Allocate { size: 3726 }, Allocate { size: 1429 }, Allocate { size: 3188 }, Allocate { size: 6375 }, Allocate { size: 4453 }, Allocate { size: 2514 }, Allocate { size: 4754 }, Allocate { size: 6785 }, Allocate { size: 2751 }, Allocate { size: 4107 }, Allocate { size: 3509 }, Allocate { size: 5897 }, Allocate { size: 7081 }, Allocate { size: 2419 }, Allocate { size: 5400 }, Allocate { size: 7135 }, Free { index: 14 }, Allocate { size: 2130 }, Free { index: 18 }, Allocate { size: 3450 }, Allocate { size: 1296 }, Allocate { size: 8091 }, Allocate { size: 4646 }, Allocate { size: 3891 }, Free { index: 0 }, Allocate { size: 1087 }, Allocate { size: 101 }, Allocate { size: 5353 }, Allocate { size: 3381 }, Allocate { size: 6869 }, Free { index: 1 }, Allocate { size: 3750 }, Allocate { size: 1398 }, Free { index: 22 }, Allocate { size: 18 }, Free { index: 25 }, Allocate { size: 642 }, Free { index: 4 }, Allocate { size: 4 }, Allocate { size: 1898 }, Allocate { size: 5259 }, Free { index: 26 }, Allocate { size: 3151 }, Allocate { size: 4989 }, Allocate { size: 6493 }, Allocate { size: 551 }, Allocate { size: 706 }, Allocate { size: 4161 }, Free { index: 16 }, Allocate { size: 3422 }, Allocate { size: 3011 }, Allocate { size: 5149 }, Allocate { size: 4687 }, Allocate { size: 5 }, Free { index: 34 }, Allocate { size: 191 }, Allocate { size: 2851 }, Allocate { size: 3597 }, Free { index: 28 }, Allocate { size: 7037 }, Allocate { size: 4660 }, Allocate { size: 194 }, Allocate { size: 5537 }, Allocate { size: 3242 }, Allocate { size: 6298 }, Allocate { size: 1239 }, Allocate { size: 7025 }, Allocate { size: 3563 }, Allocate { size: 5039 }, Free { index: 40 }, Allocate { size: 4549 }, Allocate { size: 5362 }, Allocate { size: 3510 }, Free { index: 31 }, Allocate { size: 226 }, Allocate { size: 6904 }, Allocate { size: 4150 }, Allocate { size: 4914 }, Allocate { size: 2330 }, Allocate { size: 2499 }, Allocate { size: 6677 }, Allocate { size: 95 }, Allocate { size: 3726 }, Allocate { size: 3258 }, Free { index: 2 }, Allocate { size: 2129 }, Allocate { size: 3674 }, Allocate { size: 1542 }, Allocate { size: 2210 }, Free { index: 21 }, Allocate { size: 3914 }, Allocate { size: 3108 }, Allocate { size: 1979 }, Allocate { size: 2677 }, Allocate { size: 8140 }, Allocate { size: 7573 }, Allocate { size: 121 }, Free { index: 59 }, Allocate { size: 6467 }, Allocate { size: 262 }, Allocate { size: 7711 }, Allocate { size: 2450 }, Allocate { size: 4351 }, Allocate { size: 4282 }, Free { index: 39 }, Allocate { size: 4050 }, Allocate { size: 67 }, Allocate { size: 5560 }, Free { index: 51 }, Allocate { size: 6038 }, Allocate { size: 555 }, Allocate { size: 1852 }, Free { index: 78 }, Allocate { size: 698 }]; assert!(!causes_fragmentation(&sequence, true)); } @@ -322,14 +324,14 @@ fn transactions_work() { db.free_old_epochs() } - validate_db(&db, |snaphot, coverage| { - coverage.set_allocated(snaphot.root.range()); - let data = snaphot.read(snaphot.root); + validate_db(&db, |snapshot, coverage| { + coverage.set_allocated(snapshot.root.range()); + let data = snapshot.read(snapshot.root); let mut next = data.list; while !next.is_null() { coverage.set_allocated(next.range()); - next = snaphot.read(next).next; + next = snapshot.read(next).next; } }); } @@ -656,6 +658,8 @@ impl<'c, R> CoverageMap<'c, R> { current_page.push(char::from_u32(0x2800 + byte as u32).unwrap()); } + res.push_str(¤t_page); + Self::set_color(&mut res, ""); res @@ -848,3 +852,106 @@ fn hexdump(bytes: &[u8]) { child.wait().unwrap(); } + +#[test] +fn queue() { + fn validate(db: &Db>) { + validate_db(db, |snapshot, coverage| { + let queue = snapshot.root; + coverage.set_allocated(queue.range()); + + let n = queue.length(snapshot); + + dbg!(n); + + let mut next = *snapshot.read(field_ptr!(queue, Queue, head)); + + for i in 0..n { + coverage.set_allocated(next.range()); + + next = *snapshot.read(field_ptr!(next, QueueElement, next)) + } + }) + }; + + let mut db = Db::>::create( + tempfile::tempfile().unwrap(), + &[size_of::>() as u32], + ); + + db.transaction(|transaction| Queue::new(transaction)); + db.transaction(|transaction| transaction.root().enqueue(transaction, 1.into())); + db.transaction(|transaction| transaction.root().enqueue(transaction, 2.into())); + db.transaction(|transaction| { + let (queue, res) = transaction.root().dequeue(transaction).unwrap(); + assert_eq!(res.get(), 1); + queue + }); + db.transaction(|transaction| { + let (queue, res) = transaction.root().dequeue(transaction).unwrap(); + assert_eq!(res.get(), 2); + queue + }); + + db.transaction(|transaction| { + assert!(transaction.root().dequeue(transaction).is_none()); + transaction.root() + }); + + db.free_old_epochs(); + + validate(&db); + + let mut rng = rand::thread_rng(); + + let mut j = 0; + let mut i = 0; + for _ in 0..200 { + db.transaction(|transaction| { + let mut root = transaction.root(); + + let n = rng.gen_range(1..20); + for _ in 0..n { + if rng.gen_bool(0.5) || root.length(transaction) == 0 { + let how_many = rng.gen_range(1..20); + let elements = (i..i + how_many).map(U64::from).collect::>(); + root = root.enqueue_many(transaction, &elements); + i += how_many; + } else { + let res; + (root, res) = root.dequeue(transaction).unwrap(); + assert_eq!(res.get(), j); + j += 1; + } + } + + root + }); + + if rng.gen_bool(0.5) { + db.free_old_epochs(); + } + } + + db.transaction(|transaction| { + let mut root = transaction.root(); + + let n = root.length(transaction); + + if n != 0 { + let mut res; + (root, res) = root.dequeue_many(transaction, n).unwrap(); + + for x in res { + assert_eq!(x.get(), j); + j += 1; + } + } + + root + }); + + db.free_old_epochs(); + + validate(&db); +} diff --git a/src/transaction.rs b/src/transaction.rs index e7bf2ad..4db78d2 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,10 +1,10 @@ -use std::{cell::RefCell, collections::HashMap, mem::size_of}; +use std::{cell::RefCell, collections::HashMap, fmt::Debug, mem::size_of}; use zerocopy::{AsBytes, FromBytes}; use crate::{mapped::ReaderTrait, Db, FilePointer, FileRange, RawFilePointer}; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] struct Replaced { from: FileRange, to: Option, @@ -16,6 +16,15 @@ pub struct TransactionHandle<'t, R> { new: HashMap, } +impl<'t, R> Debug for TransactionHandle<'t, R> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TransactionHandle") + .field("replaced", &self.replaced) + .field("new", &self.new) + .finish() + } +} + impl<'t, R> ReaderTrait for TransactionHandle<'t, R> { fn read_raw(&self, ptr: FileRange) -> &[u8] { self.reference_raw(ptr) @@ -30,7 +39,7 @@ impl<'t, R> TransactionHandle<'t, R> { } } - pub fn to_free(&self) -> Vec { + pub(crate) fn to_free(&self) -> Vec { self.replaced .values() .map(|replaced| replaced.from) @@ -52,10 +61,14 @@ impl<'t, R> TransactionHandle<'t, R> { // TODO: replacing this with unwrap_or(range) // will access the original region, which can't have actually been // allocated, but was logically freed. - replaced.to.expect("use after free") + replaced.to.unwrap_or_else(|| { + dbg!(&self); + panic!("use after free at: {range:?}") + }) } else if let Some(&new) = self.new.get(&range.start) { - assert_eq!(new, range); - new + debug_assert_eq!(new.start, range.start); + assert!(new.end() >= range.end()); + new.start.range(range.len()) } else { range } @@ -80,7 +93,7 @@ impl<'t, R> TransactionHandle<'t, R> { self.db.copy_nonoverlapping(range, new); let res = self.replaced.insert( - new.start, + range.start, Replaced { from: range, to: Some(new), @@ -100,8 +113,8 @@ impl<'t, R> TransactionHandle<'t, R> { &self.db.map[range.as_range()] } - pub unsafe fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) { - let range = self.write_ptr_raw(range); + pub fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) { + let range = unsafe { self.write_ptr_raw(range) }; (range, &mut self.db.map[range.as_range()]) }