add queue

This commit is contained in:
soruh 2023-08-13 20:35:39 +02:00
parent eff889560b
commit 1d3b622b73
7 changed files with 316 additions and 23 deletions

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,2 @@
pub mod btree;
pub mod queue;

151
src/datastructures/queue.rs Normal file
View File

@ -0,0 +1,151 @@
use std::marker::PhantomData;
use zerocopy::{AsBytes, FromBytes, FromZeroes, Unaligned};
use crate::ReaderTrait;
use crate::{transaction, FilePointer, TransactionHandle, U64};
#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)]
#[repr(packed)]
pub struct Queue<T: FromBytes + FromZeroes + AsBytes + Unaligned + Clone + Copy> {
pub(crate) head: FilePointer<QueueElement<T>>,
pub(crate) length: U64,
_phantom: PhantomData<T>,
}
impl<T: FromBytes + FromZeroes + AsBytes + Unaligned + Clone + Copy> Queue<T> {
pub fn new<R>(transaction: &mut TransactionHandle<R>) -> FilePointer<Queue<T>> {
let (queue, data) = transaction.allocate();
*data = Self {
head: FilePointer::null(),
length: 0.into(),
_phantom: PhantomData,
};
queue
}
}
impl<T: FromBytes + FromZeroes + AsBytes + Unaligned + Clone + Copy> FilePointer<Queue<T>> {
pub fn enqueue<R>(self, transaction: &mut TransactionHandle<R>, item: T) -> Self {
self.enqueue_many(transaction, &[item])
}
pub fn dequeue<R>(self, transaction: &mut TransactionHandle<R>) -> Option<(Self, T)> {
let mut res = None;
self.dequeue_many_inner(transaction, 1, |t| assert!(res.replace(t).is_none()))
.map(|ptr| (ptr, res.unwrap()))
}
pub fn enqueue_many<R>(self, transaction: &mut TransactionHandle<R>, items: &[T]) -> Self {
if items.is_empty() {
return self;
}
let old = *transaction.read(self);
let mut head = old.head;
for item in items {
let (element, data) = transaction.allocate();
*data = QueueElement {
next: head,
data: *item,
_phantom: PhantomData,
};
head = element;
}
let (queue, data) = transaction.modify(self);
*data = Queue {
head,
length: (old
.length
.get()
.checked_add(items.len().try_into().unwrap())
.unwrap())
.into(),
_phantom: PhantomData,
};
queue
}
pub fn dequeue_many<R>(
self,
transaction: &mut TransactionHandle<R>,
n: u64,
) -> Option<(Self, Vec<T>)> {
let mut res = Vec::with_capacity(n.try_into().unwrap());
self.dequeue_many_inner(transaction, n, |t| res.push(t))
.map(|ptr| {
res.reverse();
(ptr, res)
})
}
// NOTE: calls f with the elements in reverse order.
fn dequeue_many_inner<R>(
self,
transaction: &mut TransactionHandle<R>,
n: u64,
mut f: impl FnMut(T),
) -> Option<Self> {
if n == 0 {
return Some(self);
}
let old = *transaction.read(self);
if old.length.get() < n {
return None;
}
let (queue, data) = transaction.modify(self);
*data = Queue {
head: old.head,
length: (old.length.get().checked_sub(n).unwrap()).into(),
_phantom: PhantomData,
};
// println!("length: {}", old.length.get());
let mut ptr = data.head;
let skip_n = old.length.get().checked_sub(n).unwrap();
// println!("skip_n: {skip_n}");
// skip the head
for i in 0..skip_n {
// println!("skip [{i}] {ptr:?}");
ptr = *transaction.read(field_ptr!(ptr, QueueElement<T>, next));
}
// remove the tail
for i in 0..n {
// println!("free [{i}] {ptr:?}");
let element = *transaction.read(ptr);
transaction.free(ptr);
f(element.data);
ptr = element.next;
}
Some(queue)
}
pub fn length(self, reader: &impl ReaderTrait) -> u64 {
reader.read(field_ptr!(self, Queue<T>, length)).get()
}
}
#[derive(Clone, Copy, FromBytes, FromZeroes, AsBytes, Unaligned)]
#[repr(packed)]
pub(crate) struct QueueElement<T: FromBytes + FromZeroes + AsBytes + Unaligned> {
pub(crate) next: FilePointer<QueueElement<T>>,
pub(crate) data: T,
_phantom: PhantomData<T>,
}

View File

@ -14,7 +14,7 @@ macro_rules! field_ptr {
($base_ptr: expr, $type: ty, $field: ident) => {{
let base: FilePointer<$type> = $base_ptr;
let res = FilePointer::new(base.into_raw() + offset_of!($type, $field) as u64);
let res = FilePointer::new(base.into_raw() + ::memoffset::offset_of!($type, $field) as u64);
if false {
let ptr: Box<$type> = <$type as ::zerocopy::FromZeroes>::new_box_zeroed();
@ -28,17 +28,19 @@ macro_rules! field_ptr {
mod allocator;
mod atomic_arc;
pub mod datastructures;
mod mapped;
mod transaction;
#[cfg(test)]
mod tests;
pub use transaction::TransactionHandle;
use allocator::{AllocatorState, GeneralPurposeAllocator, SlabListPointer, SlabPointer};
use atomic_arc::AtomicArc;
use mapped::ReaderTrait;
use memmap::{Mmap, MmapMut};
use memoffset::offset_of;
use transaction::TransactionHandle;
use zerocopy::{AsBytes, FromBytes, FromZeroes, Ref, Unaligned, LE};
const PAGE_SIZE: u64 = 4096;
@ -290,6 +292,12 @@ struct Snapshot<R> {
map: Mmap,
}
impl<R> ReaderTrait for Snapshot<R> {
fn read_raw(&self, range: FileRange) -> &[u8] {
&self.map[range.as_range()]
}
}
impl<R> Snapshot<R> {
fn read<T: FromBytes>(&self, at: FilePointer<T>) -> &T {
self.read_range(at.range())
@ -379,20 +387,24 @@ impl<R> Db<R> {
.unwrap_or_else(|| self.add_slab(size))
}
fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle<R>) -> FilePointer<R>) {
pub fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle<R>) -> FilePointer<R>) {
let mut handle = TransactionHandle::new(self);
let root = f(&mut handle);
// dbg!(&handle);
let to_free = handle.to_free();
// dbg!(&to_free);
let snapshot = self.update_root(root);
self.snapshots
.push_back(SnapshotAndFreeList { snapshot, to_free });
}
fn free_old_epochs(&mut self) {
pub fn free_old_epochs(&mut self) {
let mut snapshots = std::mem::take(&mut self.snapshots);
while snapshots

View File

@ -1,14 +1,21 @@
use std::mem::size_of;
use std::mem::{align_of, size_of};
use zerocopy::{FromBytes, FromZeroes, Ref};
use crate::{FilePointer, FileRange, RawFilePointer};
pub trait ReaderTrait {
fn read_raw(&self, ptr: FileRange) -> &[u8];
fn read_raw(&self, range: FileRange) -> &[u8];
fn read<T: FromBytes>(&self, ptr: FilePointer<T>) -> &T {
Ref::<_, T>::new(self.read_raw(ptr.range()))
.unwrap()
let bytes = self.read_raw(ptr.range());
Ref::<_, T>::new(bytes)
.unwrap_or_else(|| {
panic!(
"failed to interpret {:?} = {bytes:?} as {}",
ptr.range(),
std::any::type_name::<T>()
)
})
.into_ref()
}
}

View File

@ -1,7 +1,9 @@
use crate::allocator::{div_round_up, FreeListBlock, RelativeFreeListHeader, SlabKind};
use crate::datastructures::queue::{self, Queue, QueueElement};
use super::*;
use mapped::ReaderTrait;
use rand::Rng;
use std::collections::BTreeSet;
use std::io::Write;
use std::ops::Shl;
@ -238,7 +240,7 @@ fn allocator() {
fn no_fragmentation() {
use Operation::*;
#[rustfmt::skip]
let mut sequence = vec![Allocate { size: 1946 }, Allocate { size: 3252 }, Free { index: 0 }, Allocate { size: 7391 }, Allocate { size: 394 }, Allocate { size: 3726 }, Allocate { size: 1429 }, Allocate { size: 3188 }, Allocate { size: 6375 }, Allocate { size: 4453 }, Allocate { size: 2514 }, Allocate { size: 4754 }, Allocate { size: 6785 }, Allocate { size: 2751 }, Allocate { size: 4107 }, Allocate { size: 3509 }, Allocate { size: 5897 }, Allocate { size: 7081 }, Allocate { size: 2419 }, Allocate { size: 5400 }, Allocate { size: 7135 }, Free { index: 14 }, Allocate { size: 2130 }, Free { index: 18 }, Allocate { size: 3450 }, Allocate { size: 1296 }, Allocate { size: 8091 }, Allocate { size: 4646 }, Allocate { size: 3891 }, Free { index: 0 }, Allocate { size: 1087 }, Allocate { size: 101 }, Allocate { size: 5353 }, Allocate { size: 3381 }, Allocate { size: 6869 }, Free { index: 1 }, Allocate { size: 3750 }, Allocate { size: 1398 }, Free { index: 22 }, Allocate { size: 18 }, Free { index: 25 }, Allocate { size: 642 }, Free { index: 4 }, Allocate { size: 4 }, Allocate { size: 1898 }, Allocate { size: 5259 }, Free { index: 26 }, Allocate { size: 3151 }, Allocate { size: 4989 }, Allocate { size: 6493 }, Allocate { size: 551 }, Allocate { size: 706 }, Allocate { size: 4161 }, Free { index: 16 }, Allocate { size: 3422 }, Allocate { size: 3011 }, Allocate { size: 5149 }, Allocate { size: 4687 }, Allocate { size: 5 }, Free { index: 34 }, Allocate { size: 191 }, Allocate { size: 2851 }, Allocate { size: 3597 }, Free { index: 28 }, Allocate { size: 7037 }, Allocate { size: 4660 }, Allocate { size: 194 }, Allocate { size: 5537 }, Allocate { size: 3242 }, Allocate { size: 6298 }, Allocate { size: 1239 }, Allocate { size: 7025 }, Allocate { size: 3563 }, Allocate { size: 5039 }, Free { index: 40 }, Allocate { size: 4549 }, Allocate { size: 5362 }, Allocate { size: 3510 }, Free { index: 31 }, Allocate { size: 226 }, Allocate { size: 6904 }, Allocate { size: 4150 }, Allocate { size: 4914 }, Allocate { size: 2330 }, Allocate { size: 2499 }, Allocate { size: 6677 }, Allocate { size: 95 }, Allocate { size: 3726 }, Allocate { size: 3258 }, Free { index: 2 }, Allocate { size: 2129 }, Allocate { size: 3674 }, Allocate { size: 1542 }, Allocate { size: 2210 }, Free { index: 21 }, Allocate { size: 3914 }, Allocate { size: 3108 }, Allocate { size: 1979 }, Allocate { size: 2677 }, Allocate { size: 8140 }, Allocate { size: 7573 }, Allocate { size: 121 }, Free { index: 59 }, Allocate { size: 6467 }, Allocate { size: 262 }, Allocate { size: 7711 }, Allocate { size: 2450 }, Allocate { size: 4351 }, Allocate { size: 4282 }, Free { index: 39 }, Allocate { size: 4050 }, Allocate { size: 67 }, Allocate { size: 5560 }, Free { index: 51 }, Allocate { size: 6038 }, Allocate { size: 555 }, Allocate { size: 1852 }, Free { index: 78 }, Allocate { size: 698 }];
let mut sequence = vec![Allocate { size: 1946 }, Allocate { size: 3252 }, Free { index: 0 }, Allocate { size: 7391 }, Allocate { size: 394 }, Allocate { size: 3726 }, Allocate { size: 1429 }, Allocate { size: 3188 }, Allocate { size: 6375 }, Allocate { size: 4453 }, Allocate { size: 2514 }, Allocate { size: 4754 }, Allocate { size: 6785 }, Allocate { size: 2751 }, Allocate { size: 4107 }, Allocate { size: 3509 }, Allocate { size: 5897 }, Allocate { size: 7081 }, Allocate { size: 2419 }, Allocate { size: 5400 }, Allocate { size: 7135 }, Free { index: 14 }, Allocate { size: 2130 }, Free { index: 18 }, Allocate { size: 3450 }, Allocate { size: 1296 }, Allocate { size: 8091 }, Allocate { size: 4646 }, Allocate { size: 3891 }, Free { index: 0 }, Allocate { size: 1087 }, Allocate { size: 101 }, Allocate { size: 5353 }, Allocate { size: 3381 }, Allocate { size: 6869 }, Free { index: 1 }, Allocate { size: 3750 }, Allocate { size: 1398 }, Free { index: 22 }, Allocate { size: 18 }, Free { index: 25 }, Allocate { size: 642 }, Free { index: 4 }, Allocate { size: 4 }, Allocate { size: 1898 }, Allocate { size: 5259 }, Free { index: 26 }, Allocate { size: 3151 }, Allocate { size: 4989 }, Allocate { size: 6493 }, Allocate { size: 551 }, Allocate { size: 706 }, Allocate { size: 4161 }, Free { index: 16 }, Allocate { size: 3422 }, Allocate { size: 3011 }, Allocate { size: 5149 }, Allocate { size: 4687 }, Allocate { size: 5 }, Free { index: 34 }, Allocate { size: 191 }, Allocate { size: 2851 }, Allocate { size: 3597 }, Free { index: 28 }, Allocate { size: 7037 }, Allocate { size: 4660 }, Allocate { size: 194 }, Allocate { size: 5537 }, Allocate { size: 3242 }, Allocate { size: 6298 }, Allocate { size: 1239 }, Allocate { size: 7025 }, Allocate { size: 3563 }, Allocate { size: 5039 }, Free { index: 40 }, Allocate { size: 4549 }, Allocate { size: 5362 }, Allocate { size: 3510 }, Free { index: 31 }, Allocate { size: 226 }, Allocate { size: 6904 }, Allocate { size: 4150 }, Allocate { size: 4914 }, Allocate { size: 2330 }, Allocate { size: 2499 }, Allocate { size: 6677 }, Allocate { size: 95 }, Allocate { size: 3726 }, Allocate { size: 3258 }, Free { index: 2 }, Allocate { size: 2129 }, Allocate { size: 3674 }, Allocate { size: 1542 }, Allocate { size: 2210 }, Free { index: 21 }, Allocate { size: 3914 }, Allocate { size: 3108 }, Allocate { size: 1979 }, Allocate { size: 2677 }, Allocate { size: 8140 }, Allocate { size: 7573 }, Allocate { size: 121 }, Free { index: 59 }, Allocate { size: 6467 }, Allocate { size: 262 }, Allocate { size: 7711 }, Allocate { size: 2450 }, Allocate { size: 4351 }, Allocate { size: 4282 }, Free { index: 39 }, Allocate { size: 4050 }, Allocate { size: 67 }, Allocate { size: 5560 }, Free { index: 51 }, Allocate { size: 6038 }, Allocate { size: 555 }, Allocate { size: 1852 }, Free { index: 78 }, Allocate { size: 698 }];
assert!(!causes_fragmentation(&sequence, true));
}
@ -322,14 +324,14 @@ fn transactions_work() {
db.free_old_epochs()
}
validate_db(&db, |snaphot, coverage| {
coverage.set_allocated(snaphot.root.range());
let data = snaphot.read(snaphot.root);
validate_db(&db, |snapshot, coverage| {
coverage.set_allocated(snapshot.root.range());
let data = snapshot.read(snapshot.root);
let mut next = data.list;
while !next.is_null() {
coverage.set_allocated(next.range());
next = snaphot.read(next).next;
next = snapshot.read(next).next;
}
});
}
@ -656,6 +658,8 @@ impl<'c, R> CoverageMap<'c, R> {
current_page.push(char::from_u32(0x2800 + byte as u32).unwrap());
}
res.push_str(&current_page);
Self::set_color(&mut res, "");
res
@ -848,3 +852,106 @@ fn hexdump(bytes: &[u8]) {
child.wait().unwrap();
}
#[test]
fn queue() {
fn validate(db: &Db<Queue<U64>>) {
validate_db(db, |snapshot, coverage| {
let queue = snapshot.root;
coverage.set_allocated(queue.range());
let n = queue.length(snapshot);
dbg!(n);
let mut next = *snapshot.read(field_ptr!(queue, Queue<U64>, head));
for i in 0..n {
coverage.set_allocated(next.range());
next = *snapshot.read(field_ptr!(next, QueueElement<U64>, next))
}
})
};
let mut db = Db::<Queue<U64>>::create(
tempfile::tempfile().unwrap(),
&[size_of::<QueueElement<U64>>() as u32],
);
db.transaction(|transaction| Queue::new(transaction));
db.transaction(|transaction| transaction.root().enqueue(transaction, 1.into()));
db.transaction(|transaction| transaction.root().enqueue(transaction, 2.into()));
db.transaction(|transaction| {
let (queue, res) = transaction.root().dequeue(transaction).unwrap();
assert_eq!(res.get(), 1);
queue
});
db.transaction(|transaction| {
let (queue, res) = transaction.root().dequeue(transaction).unwrap();
assert_eq!(res.get(), 2);
queue
});
db.transaction(|transaction| {
assert!(transaction.root().dequeue(transaction).is_none());
transaction.root()
});
db.free_old_epochs();
validate(&db);
let mut rng = rand::thread_rng();
let mut j = 0;
let mut i = 0;
for _ in 0..200 {
db.transaction(|transaction| {
let mut root = transaction.root();
let n = rng.gen_range(1..20);
for _ in 0..n {
if rng.gen_bool(0.5) || root.length(transaction) == 0 {
let how_many = rng.gen_range(1..20);
let elements = (i..i + how_many).map(U64::from).collect::<Vec<U64>>();
root = root.enqueue_many(transaction, &elements);
i += how_many;
} else {
let res;
(root, res) = root.dequeue(transaction).unwrap();
assert_eq!(res.get(), j);
j += 1;
}
}
root
});
if rng.gen_bool(0.5) {
db.free_old_epochs();
}
}
db.transaction(|transaction| {
let mut root = transaction.root();
let n = root.length(transaction);
if n != 0 {
let mut res;
(root, res) = root.dequeue_many(transaction, n).unwrap();
for x in res {
assert_eq!(x.get(), j);
j += 1;
}
}
root
});
db.free_old_epochs();
validate(&db);
}

View File

@ -1,10 +1,10 @@
use std::{cell::RefCell, collections::HashMap, mem::size_of};
use std::{cell::RefCell, collections::HashMap, fmt::Debug, mem::size_of};
use zerocopy::{AsBytes, FromBytes};
use crate::{mapped::ReaderTrait, Db, FilePointer, FileRange, RawFilePointer};
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug)]
struct Replaced {
from: FileRange,
to: Option<FileRange>,
@ -16,6 +16,15 @@ pub struct TransactionHandle<'t, R> {
new: HashMap<RawFilePointer, FileRange>,
}
impl<'t, R> Debug for TransactionHandle<'t, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransactionHandle")
.field("replaced", &self.replaced)
.field("new", &self.new)
.finish()
}
}
impl<'t, R> ReaderTrait for TransactionHandle<'t, R> {
fn read_raw(&self, ptr: FileRange) -> &[u8] {
self.reference_raw(ptr)
@ -30,7 +39,7 @@ impl<'t, R> TransactionHandle<'t, R> {
}
}
pub fn to_free(&self) -> Vec<FileRange> {
pub(crate) fn to_free(&self) -> Vec<FileRange> {
self.replaced
.values()
.map(|replaced| replaced.from)
@ -52,10 +61,14 @@ impl<'t, R> TransactionHandle<'t, R> {
// TODO: replacing this with unwrap_or(range)
// will access the original region, which can't have actually been
// allocated, but was logically freed.
replaced.to.expect("use after free")
replaced.to.unwrap_or_else(|| {
dbg!(&self);
panic!("use after free at: {range:?}")
})
} else if let Some(&new) = self.new.get(&range.start) {
assert_eq!(new, range);
new
debug_assert_eq!(new.start, range.start);
assert!(new.end() >= range.end());
new.start.range(range.len())
} else {
range
}
@ -80,7 +93,7 @@ impl<'t, R> TransactionHandle<'t, R> {
self.db.copy_nonoverlapping(range, new);
let res = self.replaced.insert(
new.start,
range.start,
Replaced {
from: range,
to: Some(new),
@ -100,8 +113,8 @@ impl<'t, R> TransactionHandle<'t, R> {
&self.db.map[range.as_range()]
}
pub unsafe fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) {
let range = self.write_ptr_raw(range);
pub fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) {
let range = unsafe { self.write_ptr_raw(range) };
(range, &mut self.db.map[range.as_range()])
}