This commit is contained in:
soruh 2023-07-24 23:31:16 +02:00
parent 88e35f3c15
commit c762d6b18f
3 changed files with 227 additions and 125 deletions

64
src/allocator.rs Normal file
View File

@ -0,0 +1,64 @@
use std::mem::size_of;
use zerocopy::{AsBytes, FromBytes, Unaligned};
use crate::{Db, FilePointer, Header, PAGE_SIZE, U32};
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)]
#[repr(transparent)]
pub struct FreeList {
head: FilePointer,
}
impl FreeList {
pub fn empty() -> Self {
Self {
head: FilePointer::null(),
}
}
}
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)]
#[repr(C)]
pub struct AllocatorState {
pub general: FreeList,
pub slabs: FilePointer,
}
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)]
#[repr(C)]
pub struct PoolListHeader {
next: FilePointer,
size: U32,
len: U32,
}
impl PoolListHeader {
fn capacity(&self) -> u32 {
(self.size.get() - size_of::<PoolListHeader>() as u32) / size_of::<SizedFreeList>() as u32
}
}
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)]
#[repr(C)]
pub struct SizedFreeList {
element_size: U32,
head: FreeList,
}
impl AllocatorState {
pub fn init(&self, db: &mut Db, size: U32) {
db.write(
self.slabs,
PoolListHeader {
next: FilePointer::null(),
size,
len: 0.into(),
},
);
}
fn slabs_mut<'db>(&self, db: &'db mut Db) -> &'db mut PoolListHeader {
db.modify(self.slabs)
}
}

View File

@ -37,6 +37,7 @@ impl<T> AtomicArc<T> {
} }
} }
#[must_use]
pub fn swap(&self, new: Arc<T>) -> Arc<T> { pub fn swap(&self, new: Arc<T>) -> Arc<T> {
unsafe { unsafe {
let old = self let old = self

View File

@ -1,24 +1,46 @@
use std::{collections::HashMap, fs::File, mem::size_of, ops::Range, sync::Arc}; use std::{borrow::BorrowMut, collections::HashMap, fs::File, mem::size_of, ops::Range, sync::Arc};
mod allocator;
mod atomic_arc; mod atomic_arc;
use allocator::{AllocatorState, FreeList};
use atomic_arc::AtomicArc; use atomic_arc::AtomicArc;
use memmap::{Mmap, MmapMut}; use memmap::{Mmap, MmapMut};
use zerocopy::{AsBytes, FromBytes, LayoutVerified, Unaligned, LE}; use zerocopy::{AsBytes, FromBytes, LayoutVerified, Unaligned, LE};
const PAGE_SIZE: u64 = 4096;
type U64 = zerocopy::byteorder::U64<LE>; type U64 = zerocopy::byteorder::U64<LE>;
type U32 = zerocopy::byteorder::U32<LE>;
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Copy, FromBytes, AsBytes, Unaligned, Debug, Hash, PartialEq, Eq)]
#[repr(transparent)] #[repr(transparent)]
struct FilePointer(U64); pub struct FilePointer(U64);
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned, Debug)] impl FilePointer {
fn page(n: u64) -> Self {
Self((n * PAGE_SIZE).into())
}
fn null() -> Self {
Self(U64::ZERO)
}
}
#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned, Debug, PartialEq, Eq)]
#[repr(C)] #[repr(C)]
struct FileRange { pub struct FileRange {
start: FilePointer, start: FilePointer,
len: U64, len: U64,
} }
impl std::ops::Add<u64> for FilePointer {
type Output = Self;
fn add(self, rhs: u64) -> Self::Output {
Self((self.0.get() + rhs).into())
}
}
impl FilePointer { impl FilePointer {
pub fn range(&self, len: u64) -> FileRange { pub fn range(&self, len: u64) -> FileRange {
FileRange { FileRange {
@ -43,13 +65,18 @@ impl FileRange {
struct Header { struct Header {
magic: [u8; 16], magic: [u8; 16],
root: FilePointer, root: FilePointer,
allocator_state: AllocatorState,
} }
impl Default for Header { impl Default for Header {
fn default() -> Self { fn default() -> Self {
Self { Self {
magic: [0; 16], magic: *b"cool db format 1",
root: FilePointer(0.into()), root: FilePointer::null(),
allocator_state: AllocatorState {
general: FreeList::empty(),
slabs: FilePointer::page(0) + size_of::<Header>() as u64,
},
} }
} }
} }
@ -63,7 +90,7 @@ pub struct Reader {
state: Arc<AtomicArc<Snapshot>>, state: Arc<AtomicArc<Snapshot>>,
} }
struct Db { pub struct Db {
file: File, file: File,
map: MmapMut, map: MmapMut,
header: Header, header: Header,
@ -71,111 +98,76 @@ struct Db {
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
struct Modification { struct Replaced {
old_range: Option<FileRange>, from: FileRange,
new_range: FileRange, to: Option<FileRange>,
} }
struct TransactionHandle<'t> { pub struct TransactionHandle<'t> {
db: &'t mut Db, db: &'t mut Db,
modifications: HashMap<FilePointer, Modification>, replaced: HashMap<FilePointer, Replaced>,
allocator: Allocator, new: HashMap<FilePointer, FileRange>,
}
struct Allocator {
head: FilePointer,
}
impl Allocator {
unsafe fn allocate_range(&mut self, size: u64) -> FileRange {
todo!()
}
unsafe fn free_range(&mut self, range: FileRange) {
todo!()
}
unsafe fn allocate_modification(
&mut self,
old_range: Option<FileRange>,
size: u64,
) -> Modification {
let new_range = self.allocate_range(size);
Modification {
old_range,
new_range,
}
}
} }
impl<'t> TransactionHandle<'t> { impl<'t> TransactionHandle<'t> {
fn get_data<'a>(&'a mut self, modification: Modification) -> (FileRange, &'a mut [u8]) { unsafe fn get_data(&mut self, range: FileRange) -> &mut [u8] {
( &mut self.db.map[range.as_range()]
modification.new_range,
&mut self.db.map[modification.new_range.as_range()],
)
}
pub unsafe fn free(&mut self, range: FileRange) {
self.allocator.free_range(range)
}
pub unsafe fn allocate(&mut self, new_size: u64) -> (FileRange, &mut [u8]) {
let modification = self.allocator.allocate_modification(None, new_size);
assert!(self
.modifications
.insert(modification.new_range.start, modification)
.is_none());
let modification = *self
.modifications
.get(&modification.new_range.start)
.unwrap();
self.get_data(modification)
} }
pub unsafe fn modify(&mut self, range: FileRange) -> (FileRange, &mut [u8]) { pub unsafe fn modify(&mut self, range: FileRange) -> (FileRange, &mut [u8]) {
self.resize(range, range.len()) let new = if let Some(&replaced) = self.replaced.get(&range.start) {
assert_eq!(replaced.from, range);
if let Some(to) = replaced.to {
assert_eq!(to, range);
to
} else {
let (to, _) = self.allocate(range.len());
self.replaced.get_mut(&range.start).unwrap().to = Some(to);
to
}
} else if let Some(&new) = self.new.get(&range.start) {
assert_eq!(new, range);
new
} else {
let (new, _) = self.allocate(range.len());
let res = self.replaced.insert(
new.start,
Replaced {
from: range,
to: Some(new),
},
);
debug_assert!(res.is_none());
new
};
(new, self.get_data(new))
} }
fn resize(&mut self, range: FileRange, new_size: u64) -> (FileRange, &mut [u8]) { pub fn allocate(&mut self, length: u64) -> (FileRange, &mut [u8]) {
let modification = *self unsafe {
.modifications let new = self.allocate_range(length);
.entry(range.start)
.or_insert_with(|| unsafe {
self.allocator.allocate_modification(Some(range), new_size)
});
assert_eq!( let res = self.new.insert(new.start, new);
modification.new_range.len(), debug_assert!(res.is_none());
new_size,
"tried to resize a region twice."
);
let n = usize::try_from(range.len().min(new_size)).unwrap(); (new, self.get_data(new))
{
let old_range = range.as_range();
let new_range = modification.new_range.as_range();
assert!(!old_range.contains(&new_range.start));
assert!(!old_range.contains(&new_range.end));
assert!(!new_range.contains(&old_range.start));
assert!(!new_range.contains(&old_range.end));
} }
}
// this is fine, because we just allocated the space we copy into, so it can't overlap with the source pub fn free(&mut self, range: FileRange) {
let old_data: &'static [u8] = unsafe { &*(&self.db.map[range.as_range()] as *const _) }; //
}
let data = self.get_data(modification); fn allocate_range(&mut self, length: u64) -> FileRange {
let range: FileRange = todo!();
data.1[..n].copy_from_slice(&old_data[..n]); range
data
} }
} }
@ -199,6 +191,9 @@ impl Db {
} }
fn update_root(&mut self, new_root: FilePointer) -> Arc<Snapshot> { fn update_root(&mut self, new_root: FilePointer) -> Arc<Snapshot> {
// TODO: we could write some here + flush here for better consistency
// e.g. a copy of the new root pointer
// flush all data in file // flush all data in file
self.map.flush().unwrap(); self.map.flush().unwrap();
@ -208,8 +203,6 @@ impl Db {
.flush_range(Self::root_ptr().0.get() as usize, size_of::<FilePointer>()) .flush_range(Self::root_ptr().0.get() as usize, size_of::<FilePointer>())
.unwrap(); .unwrap();
// TODO: we could do a second write + flush here for better consistency
// update data that readers see // update data that readers see
self.state.swap(Arc::new(Snapshot { self.state.swap(Arc::new(Snapshot {
root: new_root, root: new_root,
@ -237,6 +230,26 @@ impl Db {
.write(data) .write(data)
} }
fn modify<T: FromBytes + AsBytes>(&mut self, at: FilePointer) -> &mut T {
self.modify_range(at.range(size_of::<T>() as u64))
}
fn modify_range<T: FromBytes + AsBytes>(&mut self, range: FileRange) -> &mut T {
LayoutVerified::<_, T>::new(&mut self.map[range.as_range()])
.unwrap()
.into_mut()
}
fn reference<T: FromBytes + AsBytes>(&self, at: FilePointer) -> &T {
self.reference_range(at.range(size_of::<T>() as u64))
}
fn reference_range<T: FromBytes + AsBytes>(&self, range: FileRange) -> &T {
LayoutVerified::<_, T>::new(&self.map[range.as_range()])
.unwrap()
.into_ref()
}
fn remap(&mut self) { fn remap(&mut self) {
let map = unsafe { MmapMut::map_mut(&self.file) }.unwrap(); let map = unsafe { MmapMut::map_mut(&self.file) }.unwrap();
self.map = map; self.map = map;
@ -246,56 +259,80 @@ impl Db {
unsafe { Mmap::map(&self.file) }.unwrap() unsafe { Mmap::map(&self.file) }.unwrap()
} }
fn resize(&mut self, len: u64) { fn add_pages(&mut self, n: u64) {
self.file.set_len(len).unwrap(); self.file
.set_len(self.file.metadata().unwrap().len() + PAGE_SIZE * n)
.unwrap();
self.remap(); self.remap();
} }
pub fn new(file: File) -> Self { pub fn new(file: File) -> Self {
let len = file.metadata().unwrap().len(); let len = file.metadata().unwrap().len();
if len == 0 { if len == 0 {
file.set_len(size_of::<Header>() as u64).unwrap(); file.set_len(PAGE_SIZE).unwrap();
} }
dbg!(&len); let map = unsafe { MmapMut::map_mut(&file) }.unwrap();
let mut map = unsafe { MmapMut::map_mut(&file) }.unwrap(); let mut db = Self {
state: Arc::new(AtomicArc::new(Arc::new(Snapshot {
// TODO use the crate Db object and call Db::write(Db::header_ptr()) root: FilePointer::null(),
let mut header_bytes = map: unsafe { Mmap::map(&file).unwrap() },
LayoutVerified::<_, Header>::new(&mut map[..size_of::<Header>()]).unwrap(); }))),
let header = if len == 0 {
let header = Header::default();
header_bytes.write(header);
header
} else {
header_bytes.read()
};
let state = Arc::new(AtomicArc::new(Arc::new(Snapshot {
root: header.root,
map: todo!(),
})));
Self {
file, file,
map, map,
header, header: Header::default(),
state, };
if len == 0 {
db.init_allocator();
db.write(Self::header_ptr(), db.header);
} else {
db.header = db.read(Self::header_ptr());
} }
let _ = db.state.swap(Arc::new(Snapshot {
root: db.header.root,
map: unsafe { Mmap::map(&db.file).unwrap() },
}));
db
}
fn init_allocator(&mut self) {
let allocator_state = self.header.allocator_state;
allocator_state.init(
self,
(PAGE_SIZE - size_of::<Header>() as u64).try_into().unwrap(),
);
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Write;
use std::process::Stdio;
use super::*; use super::*;
#[test] #[test]
fn it_works() { fn it_works() {
Db::root_ptr();
let db = Db::new(tempfile::tempfile().unwrap()); let db = Db::new(tempfile::tempfile().unwrap());
let mut child = std::process::Command::new("hexdump")
.arg("-C")
.stdin(Stdio::piped())
.stdout(Stdio::inherit())
.spawn()
.unwrap();
let mut stdin = child.stdin.take().expect("failed to get stdin");
stdin.write_all(db.map.as_bytes()).unwrap();
std::mem::drop(stdin);
child.wait().unwrap();
} }
} }