start rewrite of transaction api
This commit is contained in:
parent
897af34e37
commit
137f6cd372
178
src/lib.rs
178
src/lib.rs
@ -1,6 +1,5 @@
|
|||||||
use std::{
|
use std::{
|
||||||
borrow::BorrowMut,
|
collections::{BTreeMap, VecDeque},
|
||||||
collections::{BTreeMap, HashMap},
|
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
fs::File,
|
fs::File,
|
||||||
mem::size_of,
|
mem::size_of,
|
||||||
@ -10,10 +9,14 @@ use std::{
|
|||||||
|
|
||||||
mod allocator;
|
mod allocator;
|
||||||
mod atomic_arc;
|
mod atomic_arc;
|
||||||
|
mod mapped;
|
||||||
|
mod transaction;
|
||||||
|
|
||||||
use allocator::{AllocatorState, FreeList, GeneralPurposeAllocator, SlabListPointer, SlabPointer};
|
use allocator::{AllocatorState, GeneralPurposeAllocator, SlabListPointer, SlabPointer};
|
||||||
use atomic_arc::AtomicArc;
|
use atomic_arc::AtomicArc;
|
||||||
|
use mapped::ReaderTrait;
|
||||||
use memmap::{Mmap, MmapMut};
|
use memmap::{Mmap, MmapMut};
|
||||||
|
use transaction::TransactionHandle;
|
||||||
use zerocopy::{AsBytes, FromBytes, LayoutVerified, Unaligned, LE};
|
use zerocopy::{AsBytes, FromBytes, LayoutVerified, Unaligned, LE};
|
||||||
|
|
||||||
const PAGE_SIZE: u64 = 4096;
|
const PAGE_SIZE: u64 = 4096;
|
||||||
@ -176,7 +179,7 @@ pub struct Db {
|
|||||||
map: MmapMut,
|
map: MmapMut,
|
||||||
slabs: BTreeMap<u32, SlabPointer>,
|
slabs: BTreeMap<u32, SlabPointer>,
|
||||||
state: Arc<AtomicArc<Snapshot>>,
|
state: Arc<AtomicArc<Snapshot>>,
|
||||||
snapshots: Vec<SnapshotAndFreeList>,
|
snapshots: VecDeque<SnapshotAndFreeList>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SnapshotAndFreeList {
|
struct SnapshotAndFreeList {
|
||||||
@ -184,126 +187,6 @@ struct SnapshotAndFreeList {
|
|||||||
to_free: Vec<FileRange>,
|
to_free: Vec<FileRange>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
|
||||||
struct Replaced {
|
|
||||||
from: FileRange,
|
|
||||||
to: FileRange,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TransactionHandle<'t> {
|
|
||||||
db: &'t mut Db,
|
|
||||||
replaced: HashMap<FilePointer, Replaced>,
|
|
||||||
new: HashMap<FilePointer, FileRange>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'t> TransactionHandle<'t> {
|
|
||||||
pub unsafe fn modify_raw(&mut self, range: FileRange) -> (FileRange, &mut [u8]) {
|
|
||||||
let range = if let Some(&replaced) = self.replaced.get(&range.start) {
|
|
||||||
assert_eq!(replaced.from, range);
|
|
||||||
replaced.to
|
|
||||||
} else if let Some(&new) = self.new.get(&range.start) {
|
|
||||||
assert_eq!(new, range);
|
|
||||||
new
|
|
||||||
} else {
|
|
||||||
let (new, _) = self.allocate_raw(range.len());
|
|
||||||
|
|
||||||
self.db.copy(range, new);
|
|
||||||
|
|
||||||
let res = self.replaced.insert(
|
|
||||||
new.start,
|
|
||||||
Replaced {
|
|
||||||
from: range,
|
|
||||||
to: new,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
debug_assert!(res.is_none());
|
|
||||||
|
|
||||||
new
|
|
||||||
};
|
|
||||||
|
|
||||||
(range, &mut self.db.map[range.as_range()])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_raw(&mut self, range: FileRange) -> (FileRange, &[u8]) {
|
|
||||||
let range = if let Some(&replaced) = self.replaced.get(&range.start) {
|
|
||||||
assert_eq!(replaced.from, range);
|
|
||||||
replaced.to
|
|
||||||
} else if let Some(&new) = self.new.get(&range.start) {
|
|
||||||
assert_eq!(new, range);
|
|
||||||
new
|
|
||||||
} else {
|
|
||||||
range
|
|
||||||
};
|
|
||||||
|
|
||||||
(range, &self.db.map[range.as_range()])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn allocate_raw(&mut self, length: u64) -> (FileRange, &mut [u8]) {
|
|
||||||
unsafe {
|
|
||||||
let range = self.allocate_range(length);
|
|
||||||
|
|
||||||
let res = self.new.insert(range.start, range);
|
|
||||||
debug_assert!(res.is_none());
|
|
||||||
|
|
||||||
(range, &mut self.db.map[range.as_range()])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn modify_range<T: FromBytes + AsBytes>(
|
|
||||||
&mut self,
|
|
||||||
range: FileRange,
|
|
||||||
) -> (FileRange, &mut T) {
|
|
||||||
unsafe {
|
|
||||||
let (ptr, _) = self.modify_raw(range);
|
|
||||||
(ptr, self.db.modify_range(ptr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn modify<T: FromBytes + AsBytes>(&mut self, at: FilePointer) -> (FileRange, &mut T) {
|
|
||||||
self.modify_range(at.range(size_of::<T>() as u64))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_range<T: FromBytes>(&mut self, range: FileRange) -> (FileRange, &T) {
|
|
||||||
unsafe {
|
|
||||||
let (ptr, _) = self.read_raw(range);
|
|
||||||
(ptr, self.db.reference_range(ptr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read<T: FromBytes>(&mut self, at: FilePointer) -> (FileRange, &T) {
|
|
||||||
unsafe {
|
|
||||||
let (ptr, _) = self.read_raw(at.range(size_of::<T>() as u64));
|
|
||||||
(ptr, self.db.reference_range(ptr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn allocate_size<T: FromBytes + AsBytes>(&mut self, length: u64) -> (FileRange, &mut T) {
|
|
||||||
unsafe {
|
|
||||||
let (ptr, _) = self.allocate_raw(length);
|
|
||||||
(ptr, self.db.modify_range(ptr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn allocate<T: FromBytes + AsBytes>(&mut self) -> (FileRange, &mut T) {
|
|
||||||
unsafe {
|
|
||||||
let (ptr, _) = self.allocate_raw(size_of::<T>() as u64);
|
|
||||||
(ptr, self.db.modify_range(ptr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn free(&mut self, range: FileRange) {
|
|
||||||
self.db.free(range)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn allocate_range(&mut self, size: u64) -> FileRange {
|
|
||||||
self.db.allocate(size)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn root(&self) -> FilePointer {
|
|
||||||
self.db.header().root
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Db {
|
impl Db {
|
||||||
fn header(&self) -> &Header {
|
fn header(&self) -> &Header {
|
||||||
unsafe {
|
unsafe {
|
||||||
@ -332,42 +215,35 @@ impl Db {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) {
|
fn transaction(&mut self, f: impl FnOnce(&mut TransactionHandle) -> FilePointer) {
|
||||||
let mut handle = TransactionHandle {
|
let mut handle = TransactionHandle::new(self);
|
||||||
db: self,
|
|
||||||
replaced: HashMap::new(),
|
|
||||||
new: HashMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let root = f(&mut handle);
|
let root = f(&mut handle);
|
||||||
|
|
||||||
let to_free = handle
|
let to_free = handle.to_free();
|
||||||
.replaced
|
|
||||||
.values()
|
|
||||||
.map(|replaced| replaced.from)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let snapshot = self.update_root(root);
|
let snapshot = self.update_root(root);
|
||||||
|
|
||||||
self.snapshots
|
self.snapshots
|
||||||
.push(SnapshotAndFreeList { snapshot, to_free });
|
.push_back(SnapshotAndFreeList { snapshot, to_free });
|
||||||
}
|
}
|
||||||
|
|
||||||
fn free_old_epochs(&mut self) {
|
fn free_old_epochs(&mut self) {
|
||||||
let mut snapshots = std::mem::take(&mut self.snapshots);
|
let mut snapshots = std::mem::take(&mut self.snapshots);
|
||||||
|
|
||||||
snapshots.retain_mut(|snapshot| {
|
while snapshots
|
||||||
if Arc::get_mut(&mut snapshot.snapshot).is_some() {
|
.front_mut()
|
||||||
println!("freeing epoch");
|
.is_some_and(|snapshot| Arc::get_mut(&mut snapshot.snapshot).is_some())
|
||||||
// if the snapshot is uniqe we are the only owner and can
|
{
|
||||||
// free the epoch.
|
// if the snapshot is uniqe we are the only owner and can free the epoch.
|
||||||
for allocation in std::mem::take(&mut snapshot.to_free) {
|
|
||||||
self.free(allocation);
|
println!("freeing epoch");
|
||||||
}
|
|
||||||
false
|
let snapshot = snapshots.pop_front().unwrap();
|
||||||
} else {
|
|
||||||
true
|
for allocation in snapshot.to_free {
|
||||||
|
self.free(allocation);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
self.snapshots = snapshots;
|
self.snapshots = snapshots;
|
||||||
}
|
}
|
||||||
@ -530,7 +406,7 @@ impl Db {
|
|||||||
file,
|
file,
|
||||||
map,
|
map,
|
||||||
slabs: BTreeMap::new(),
|
slabs: BTreeMap::new(),
|
||||||
snapshots: Vec::new(),
|
snapshots: VecDeque::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
@ -557,7 +433,7 @@ impl Db {
|
|||||||
file,
|
file,
|
||||||
map,
|
map,
|
||||||
slabs: BTreeMap::new(),
|
slabs: BTreeMap::new(),
|
||||||
snapshots: Vec::new(),
|
snapshots: VecDeque::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = db.state.swap(Arc::new(Snapshot {
|
let _ = db.state.swap(Arc::new(Snapshot {
|
||||||
@ -839,7 +715,7 @@ mod tests {
|
|||||||
root
|
root
|
||||||
};
|
};
|
||||||
|
|
||||||
let (root, &data) = transaction.read::<DataHeader>(root);
|
let &data = transaction.read::<DataHeader>(root);
|
||||||
|
|
||||||
assert_eq!(data.generation.get(), i);
|
assert_eq!(data.generation.get(), i);
|
||||||
|
|
||||||
@ -848,7 +724,7 @@ mod tests {
|
|||||||
element.next = data.list;
|
element.next = data.list;
|
||||||
element.data = i.into();
|
element.data = i.into();
|
||||||
|
|
||||||
let (root, data) = transaction.modify::<DataHeader>(root.start);
|
let (root, data) = transaction.modify::<DataHeader>(root);
|
||||||
data.list = elem_ptr.start;
|
data.list = elem_ptr.start;
|
||||||
data.generation = (i + 1).into();
|
data.generation = (i + 1).into();
|
||||||
root.start
|
root.start
|
||||||
|
14
src/mapped.rs
Normal file
14
src/mapped.rs
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
use std::mem::size_of;
|
||||||
|
|
||||||
|
use zerocopy::{FromBytes, LayoutVerified};
|
||||||
|
|
||||||
|
use crate::{FilePointer, FileRange};
|
||||||
|
|
||||||
|
pub trait ReaderTrait {
|
||||||
|
fn read_raw(&self, ptr: FileRange) -> &[u8];
|
||||||
|
fn read<T: FromBytes>(&self, ptr: FilePointer) -> &T {
|
||||||
|
LayoutVerified::<_, T>::new(self.read_raw(ptr.range(size_of::<T>() as u64)))
|
||||||
|
.unwrap()
|
||||||
|
.into_ref()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user