basic transactions
This commit is contained in:
parent
43f85073ef
commit
897af34e37
@ -17,10 +17,10 @@ impl SlabKind {
|
|||||||
Self::SingleBytes
|
Self::SingleBytes
|
||||||
} else if size < size_of::<FilePointer>() as u32 {
|
} else if size < size_of::<FilePointer>() as u32 {
|
||||||
Self::RelativeFreeList
|
Self::RelativeFreeList
|
||||||
} else if (size as u64) <= PAGE_SIZE {
|
} else if (size as u64) <= PAGE_SIZE / 2 {
|
||||||
// TODO
|
// TODO
|
||||||
// slabs of really big object are very inefficient.
|
// slabs of really big objects are very inefficient.
|
||||||
// find a better way/ allocate more pages at once e.g. at least 10 elements?
|
// find a better way/ allocate e.g. more pages at once or at least 10 elements?
|
||||||
Self::AbsoluteFreeList
|
Self::AbsoluteFreeList
|
||||||
} else {
|
} else {
|
||||||
panic!("invalid size")
|
panic!("invalid size")
|
||||||
|
311
src/lib.rs
311
src/lib.rs
@ -151,6 +151,22 @@ struct Snapshot {
|
|||||||
map: Mmap,
|
map: Mmap,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Snapshot {
|
||||||
|
fn read<T: FromBytes>(&self, at: FilePointer) -> &T {
|
||||||
|
self.read_range(at.range(size_of::<T>() as u64))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_range<T: FromBytes>(&self, range: FileRange) -> &T {
|
||||||
|
LayoutVerified::<_, T>::new(&self.map[range.as_range()])
|
||||||
|
.unwrap()
|
||||||
|
.into_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_raw(&self, range: FileRange) -> &[u8] {
|
||||||
|
&self.map[range.as_range()]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Reader {
|
pub struct Reader {
|
||||||
state: Arc<AtomicArc<Snapshot>>,
|
state: Arc<AtomicArc<Snapshot>>,
|
||||||
}
|
}
|
||||||
@ -160,12 +176,18 @@ pub struct Db {
|
|||||||
map: MmapMut,
|
map: MmapMut,
|
||||||
slabs: BTreeMap<u32, SlabPointer>,
|
slabs: BTreeMap<u32, SlabPointer>,
|
||||||
state: Arc<AtomicArc<Snapshot>>,
|
state: Arc<AtomicArc<Snapshot>>,
|
||||||
|
snapshots: Vec<SnapshotAndFreeList>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SnapshotAndFreeList {
|
||||||
|
snapshot: Arc<Snapshot>,
|
||||||
|
to_free: Vec<FileRange>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
struct Replaced {
|
struct Replaced {
|
||||||
from: FileRange,
|
from: FileRange,
|
||||||
to: Option<FileRange>,
|
to: FileRange,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TransactionHandle<'t> {
|
pub struct TransactionHandle<'t> {
|
||||||
@ -175,35 +197,23 @@ pub struct TransactionHandle<'t> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'t> TransactionHandle<'t> {
|
impl<'t> TransactionHandle<'t> {
|
||||||
unsafe fn get_data(&mut self, range: FileRange) -> &mut [u8] {
|
pub unsafe fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) {
|
||||||
&mut self.db.map[range.as_range()]
|
let range = if let Some(&replaced) = self.replaced.get(&range.start) {
|
||||||
}
|
|
||||||
|
|
||||||
pub unsafe fn modify(&mut self, range: FileRange) -> (FileRange, &mut [u8]) {
|
|
||||||
let new = if let Some(&replaced) = self.replaced.get(&range.start) {
|
|
||||||
assert_eq!(replaced.from, range);
|
assert_eq!(replaced.from, range);
|
||||||
|
replaced.to
|
||||||
if let Some(to) = replaced.to {
|
|
||||||
assert_eq!(to, range);
|
|
||||||
to
|
|
||||||
} else {
|
|
||||||
let (to, _) = self.allocate(range.len());
|
|
||||||
|
|
||||||
self.replaced.get_mut(&range.start).unwrap().to = Some(to);
|
|
||||||
|
|
||||||
to
|
|
||||||
}
|
|
||||||
} else if let Some(&new) = self.new.get(&range.start) {
|
} else if let Some(&new) = self.new.get(&range.start) {
|
||||||
assert_eq!(new, range);
|
assert_eq!(new, range);
|
||||||
new
|
new
|
||||||
} else {
|
} else {
|
||||||
let (new, _) = self.allocate(range.len());
|
let (new, _) = self.allocate_raw(range.len());
|
||||||
|
|
||||||
|
self.db.copy(range, new);
|
||||||
|
|
||||||
let res = self.replaced.insert(
|
let res = self.replaced.insert(
|
||||||
new.start,
|
new.start,
|
||||||
Replaced {
|
Replaced {
|
||||||
from: range,
|
from: range,
|
||||||
to: Some(new),
|
to: new,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
debug_assert!(res.is_none());
|
debug_assert!(res.is_none());
|
||||||
@ -211,28 +221,86 @@ impl<'t> TransactionHandle<'t> {
|
|||||||
new
|
new
|
||||||
};
|
};
|
||||||
|
|
||||||
(new, self.get_data(new))
|
(range, &mut self.db.map[range.as_range()])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn allocate(&mut self, length: u64) -> (FileRange, &mut [u8]) {
|
pub fn read_raw(&mut self, range: FileRange) -> (FileRange, &[u8]) {
|
||||||
unsafe {
|
let range = if let Some(&replaced) = self.replaced.get(&range.start) {
|
||||||
let new = self.allocate_range(length);
|
assert_eq!(replaced.from, range);
|
||||||
|
replaced.to
|
||||||
|
} else if let Some(&new) = self.new.get(&range.start) {
|
||||||
|
assert_eq!(new, range);
|
||||||
|
new
|
||||||
|
} else {
|
||||||
|
range
|
||||||
|
};
|
||||||
|
|
||||||
let res = self.new.insert(new.start, new);
|
(range, &self.db.map[range.as_range()])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn allocate_raw(&mut self, length: u64) -> (FileRange, &mut [u8]) {
|
||||||
|
unsafe {
|
||||||
|
let range = self.allocate_range(length);
|
||||||
|
|
||||||
|
let res = self.new.insert(range.start, range);
|
||||||
debug_assert!(res.is_none());
|
debug_assert!(res.is_none());
|
||||||
|
|
||||||
(new, self.get_data(new))
|
(range, &mut self.db.map[range.as_range()])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn modify_range<T: FromBytes + AsBytes>(
|
||||||
|
&mut self,
|
||||||
|
range: FileRange,
|
||||||
|
) -> (FileRange, &mut T) {
|
||||||
|
unsafe {
|
||||||
|
let (ptr, _) = self.modify_raw(range);
|
||||||
|
(ptr, self.db.modify_range(ptr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn modify<T: FromBytes + AsBytes>(&mut self, at: FilePointer) -> (FileRange, &mut T) {
|
||||||
|
self.modify_range(at.range(size_of::<T>() as u64))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_range<T: FromBytes>(&mut self, range: FileRange) -> (FileRange, &T) {
|
||||||
|
unsafe {
|
||||||
|
let (ptr, _) = self.read_raw(range);
|
||||||
|
(ptr, self.db.reference_range(ptr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read<T: FromBytes>(&mut self, at: FilePointer) -> (FileRange, &T) {
|
||||||
|
unsafe {
|
||||||
|
let (ptr, _) = self.read_raw(at.range(size_of::<T>() as u64));
|
||||||
|
(ptr, self.db.reference_range(ptr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn allocate_size<T: FromBytes + AsBytes>(&mut self, length: u64) -> (FileRange, &mut T) {
|
||||||
|
unsafe {
|
||||||
|
let (ptr, _) = self.allocate_raw(length);
|
||||||
|
(ptr, self.db.modify_range(ptr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn allocate<T: FromBytes + AsBytes>(&mut self) -> (FileRange, &mut T) {
|
||||||
|
unsafe {
|
||||||
|
let (ptr, _) = self.allocate_raw(size_of::<T>() as u64);
|
||||||
|
(ptr, self.db.modify_range(ptr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn free(&mut self, range: FileRange) {
|
pub fn free(&mut self, range: FileRange) {
|
||||||
//
|
self.db.free(range)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn allocate_range(&mut self, length: u64) -> FileRange {
|
fn allocate_range(&mut self, size: u64) -> FileRange {
|
||||||
let range: FileRange = todo!();
|
self.db.allocate(size)
|
||||||
|
}
|
||||||
|
|
||||||
range
|
fn root(&self) -> FilePointer {
|
||||||
|
self.db.header().root
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -263,8 +331,45 @@ impl Db {
|
|||||||
.unwrap_or_else(|| self.add_slab(size))
|
.unwrap_or_else(|| self.add_slab(size))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn transaction(f: impl FnOnce(TransactionHandle)) {
|
fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) {
|
||||||
// let handle = TransactionHandle {};
|
let mut handle = TransactionHandle {
|
||||||
|
db: self,
|
||||||
|
replaced: HashMap::new(),
|
||||||
|
new: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let root = f(&mut handle);
|
||||||
|
|
||||||
|
let to_free = handle
|
||||||
|
.replaced
|
||||||
|
.values()
|
||||||
|
.map(|replaced| replaced.from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let snapshot = self.update_root(root);
|
||||||
|
|
||||||
|
self.snapshots
|
||||||
|
.push(SnapshotAndFreeList { snapshot, to_free });
|
||||||
|
}
|
||||||
|
|
||||||
|
fn free_old_epochs(&mut self) {
|
||||||
|
let mut snapshots = std::mem::take(&mut self.snapshots);
|
||||||
|
|
||||||
|
snapshots.retain_mut(|snapshot| {
|
||||||
|
if Arc::get_mut(&mut snapshot.snapshot).is_some() {
|
||||||
|
println!("freeing epoch");
|
||||||
|
// if the snapshot is uniqe we are the only owner and can
|
||||||
|
// free the epoch.
|
||||||
|
for allocation in std::mem::take(&mut snapshot.to_free) {
|
||||||
|
self.free(allocation);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
self.snapshots = snapshots;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn header_ptr() -> FilePointer {
|
fn header_ptr() -> FilePointer {
|
||||||
@ -314,6 +419,13 @@ impl Db {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: fix pls
|
||||||
|
#[track_caller]
|
||||||
|
unsafe fn copy(&mut self, from: FileRange, to: FileRange) {
|
||||||
|
let (head, tail) = self.map.split_at_mut(to.start.0.get() as usize);
|
||||||
|
tail[0..to.len() as usize].copy_from_slice(&head[from.as_range()]);
|
||||||
|
}
|
||||||
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
unsafe fn read<T: FromBytes>(&self, at: FilePointer) -> T {
|
unsafe fn read<T: FromBytes>(&self, at: FilePointer) -> T {
|
||||||
self.read_range(at.range(size_of::<T>() as u64))
|
self.read_range(at.range(size_of::<T>() as u64))
|
||||||
@ -394,6 +506,15 @@ impl Db {
|
|||||||
PagePointer::nth((len / PAGE_SIZE).try_into().unwrap())
|
PagePointer::nth((len / PAGE_SIZE).try_into().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn remove_pages(&mut self, n: u64) {
|
||||||
|
let len = self.file.metadata().unwrap().len();
|
||||||
|
self.file
|
||||||
|
.set_len(len.checked_sub(PAGE_SIZE * n).unwrap())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
self.remap();
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create(file: File, slabs: &[u32]) -> Self {
|
pub fn create(file: File, slabs: &[u32]) -> Self {
|
||||||
// clear file
|
// clear file
|
||||||
file.set_len(0).unwrap();
|
file.set_len(0).unwrap();
|
||||||
@ -409,6 +530,7 @@ impl Db {
|
|||||||
file,
|
file,
|
||||||
map,
|
map,
|
||||||
slabs: BTreeMap::new(),
|
slabs: BTreeMap::new(),
|
||||||
|
snapshots: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
@ -435,6 +557,7 @@ impl Db {
|
|||||||
file,
|
file,
|
||||||
map,
|
map,
|
||||||
slabs: BTreeMap::new(),
|
slabs: BTreeMap::new(),
|
||||||
|
snapshots: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = db.state.swap(Arc::new(Snapshot {
|
let _ = db.state.swap(Arc::new(Snapshot {
|
||||||
@ -468,6 +591,7 @@ impl Db {
|
|||||||
.copied()
|
.copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: scrap the PAGE-wise allocation and make slab allocations allocations of the general allocator.
|
||||||
pub fn allocate(&mut self, size: u64) -> FileRange {
|
pub fn allocate(&mut self, size: u64) -> FileRange {
|
||||||
if let Some(slab) = self.get_slab(size) {
|
if let Some(slab) = self.get_slab(size) {
|
||||||
slab.alloc(self)
|
slab.alloc(self)
|
||||||
@ -652,32 +776,119 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn it_works() {
|
fn it_works() {
|
||||||
let mut db = Db::create(tempfile::tempfile().unwrap(), &[4, 16]);
|
let mut db = Db::create(tempfile::tempfile().unwrap(), &[4, 7, 16]);
|
||||||
|
|
||||||
fn alloc_and_free_many<const N: usize>(db: &mut Db, n: u64) {
|
|
||||||
let mut ranges = Vec::new();
|
let mut ranges = Vec::new();
|
||||||
for i in 1..n {
|
for i in 0..10000 {
|
||||||
let range = db.allocate(N as u64);
|
ranges.push(db.allocate(i % 32));
|
||||||
|
|
||||||
let data = [0; N].map(|_| i as u8);
|
|
||||||
|
|
||||||
unsafe {
|
|
||||||
db.write_range(range, data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ranges.push(range);
|
let n = ranges.len();
|
||||||
}
|
for range in ranges.drain(n / 4..n * 3 / 4) {
|
||||||
|
|
||||||
for range in ranges.into_iter().rev() {
|
|
||||||
db.free(range);
|
db.free(range);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i in 0..10000 {
|
||||||
|
ranges.push(db.allocate((4 * i) % 32));
|
||||||
}
|
}
|
||||||
|
|
||||||
// alloc_and_free_many::<1>(&mut db, 3 * PAGE_SIZE);
|
for range in ranges.drain(..) {
|
||||||
alloc_and_free_many::<4>(&mut db, PAGE_SIZE / 4 * 3);
|
db.free(range);
|
||||||
alloc_and_free_many::<4>(&mut db, PAGE_SIZE / 4 * 3);
|
}
|
||||||
alloc_and_free_many::<16>(&mut db, PAGE_SIZE / 16 * 3);
|
|
||||||
|
|
||||||
|
// hexdump(db.map.as_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn transactions_work() {
|
||||||
|
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)]
|
||||||
|
#[repr(C)]
|
||||||
|
struct DataHeader {
|
||||||
|
generation: U64,
|
||||||
|
list: FilePointer,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)]
|
||||||
|
#[repr(C)]
|
||||||
|
struct DataList {
|
||||||
|
next: FilePointer,
|
||||||
|
data: U64,
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut db = Db::create(
|
||||||
|
tempfile::tempfile().unwrap(),
|
||||||
|
&[size_of::<DataHeader>() as u32, size_of::<DataList>() as u32],
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut snapshots = Vec::new();
|
||||||
|
|
||||||
|
for i in 0..10 {
|
||||||
|
db.transaction(|transaction| {
|
||||||
|
let root = transaction.root();
|
||||||
|
|
||||||
|
let root = if root.is_null() {
|
||||||
|
let (root, data) = transaction.allocate::<DataHeader>();
|
||||||
|
|
||||||
|
*data = DataHeader {
|
||||||
|
generation: 0.into(),
|
||||||
|
list: FilePointer::null(),
|
||||||
|
};
|
||||||
|
|
||||||
|
root.start
|
||||||
|
} else {
|
||||||
|
root
|
||||||
|
};
|
||||||
|
|
||||||
|
let (root, &data) = transaction.read::<DataHeader>(root);
|
||||||
|
|
||||||
|
assert_eq!(data.generation.get(), i);
|
||||||
|
|
||||||
|
let (elem_ptr, element) = transaction.allocate::<DataList>();
|
||||||
|
|
||||||
|
element.next = data.list;
|
||||||
|
element.data = i.into();
|
||||||
|
|
||||||
|
let (root, data) = transaction.modify::<DataHeader>(root.start);
|
||||||
|
data.list = elem_ptr.start;
|
||||||
|
data.generation = (i + 1).into();
|
||||||
|
root.start
|
||||||
|
});
|
||||||
|
|
||||||
|
snapshots.push(db.create_reader().state.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i, snapshot) in snapshots.iter().enumerate() {
|
||||||
|
let root = snapshot.read::<DataHeader>(snapshot.root);
|
||||||
|
|
||||||
|
assert_eq!(root.generation.get(), i as u64 + 1);
|
||||||
|
|
||||||
|
let mut items = Vec::new();
|
||||||
|
|
||||||
|
let mut ptr = root.list;
|
||||||
|
|
||||||
|
while !ptr.is_null() {
|
||||||
|
let element = snapshot.read::<DataList>(ptr);
|
||||||
|
|
||||||
|
items.push(element.data.get());
|
||||||
|
ptr = element.next;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(items.len(), i + 1);
|
||||||
|
for (expected, &is) in items.iter().rev().enumerate() {
|
||||||
|
assert_eq!(expected as u64, is);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(snapshots);
|
||||||
|
|
||||||
|
// hexdump(db.map.as_bytes());
|
||||||
|
|
||||||
|
db.free_old_epochs();
|
||||||
|
|
||||||
|
// hexdump(db.map.as_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn hexdump(bytes: &[u8]) {
|
||||||
let mut child = std::process::Command::new("hexdump")
|
let mut child = std::process::Command::new("hexdump")
|
||||||
.arg("-C")
|
.arg("-C")
|
||||||
.stdin(Stdio::piped())
|
.stdin(Stdio::piped())
|
||||||
@ -687,7 +898,7 @@ mod tests {
|
|||||||
|
|
||||||
let mut stdin = child.stdin.take().expect("failed to get stdin");
|
let mut stdin = child.stdin.take().expect("failed to get stdin");
|
||||||
|
|
||||||
stdin.write_all(db.map.as_bytes()).unwrap();
|
stdin.write_all(bytes).unwrap();
|
||||||
|
|
||||||
std::mem::drop(stdin);
|
std::mem::drop(stdin);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user