From 88e35f3c15f8b9f1d44dc6783d8428771e9d4064 Mon Sep 17 00:00:00 2001 From: soruh Date: Sun, 23 Jul 2023 23:34:14 +0200 Subject: [PATCH] initial commit --- .gitignore | 2 + Cargo.toml | 13 ++ src/atomic_arc.rs | 76 ++++++++++++ src/lib.rs | 301 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 392 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/atomic_arc.rs create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4e63b6e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "cow_file" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +memmap = "0.7.0" +zerocopy = "0.6.1" + +[dev-dependencies] +tempfile = "3.7.0" diff --git a/src/atomic_arc.rs b/src/atomic_arc.rs new file mode 100644 index 0000000..44d98c7 --- /dev/null +++ b/src/atomic_arc.rs @@ -0,0 +1,76 @@ +use std::{ + marker::PhantomData, + sync::{ + atomic::{AtomicPtr, Ordering}, + Arc, + }, +}; + +pub struct AtomicArc { + inner: AtomicPtr, + _phantom: PhantomData>, +} + +impl Drop for AtomicArc { + fn drop(&mut self) { + unsafe { Arc::decrement_strong_count(self.load()) } + } +} + +impl AtomicArc { + fn load(&self) -> *const T { + self.inner.load(Ordering::Relaxed) + } + + pub fn new(inner: Arc) -> Self { + Self { + inner: AtomicPtr::new(Arc::into_raw(inner) as *mut T), + _phantom: PhantomData, + } + } + + pub fn get(&self) -> Arc { + unsafe { + let ptr = self.load(); + Arc::increment_strong_count(ptr); + Arc::from_raw(ptr) + } + } + + pub fn swap(&self, new: Arc) -> Arc { + unsafe { + let old = self + .inner + .swap(Arc::into_raw(new) as *mut T, Ordering::Relaxed); + + Arc::from_raw(old) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::AtomicArc; + + #[test] + fn test_basic() { + let atomic_arc = AtomicArc::new(Arc::new(1)); + + let first = atomic_arc.get(); + assert_eq!(*first, 1); + + atomic_arc.swap(Arc::new(2)); + + assert_eq!(*first, 1); + + Arc::into_inner(first).unwrap(); + + let second = atomic_arc.get(); + + std::mem::drop(atomic_arc); + + Arc::into_inner(second).unwrap(); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e716bcb --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,301 @@ +use std::{collections::HashMap, fs::File, mem::size_of, ops::Range, sync::Arc}; + +mod atomic_arc; + +use atomic_arc::AtomicArc; +use memmap::{Mmap, MmapMut}; +use zerocopy::{AsBytes, FromBytes, LayoutVerified, Unaligned, LE}; + +type U64 = zerocopy::byteorder::U64; + +#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned, Debug, Hash, PartialEq, Eq)] +#[repr(transparent)] +struct FilePointer(U64); + +#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned, Debug)] +#[repr(C)] +struct FileRange { + start: FilePointer, + len: U64, +} + +impl FilePointer { + pub fn range(&self, len: u64) -> FileRange { + FileRange { + start: *self, + len: U64::new(len), + } + } +} + +impl FileRange { + fn len(&self) -> u64 { + self.len.get() + } + fn as_range(&self) -> Range { + let start = self.start.0.get().try_into().unwrap(); + start..start + usize::try_from(self.len()).unwrap() + } +} + +#[derive(Clone, Copy, FromBytes, AsBytes, Unaligned)] +#[repr(C)] +struct Header { + magic: [u8; 16], + root: FilePointer, +} + +impl Default for Header { + fn default() -> Self { + Self { + magic: [0; 16], + root: FilePointer(0.into()), + } + } +} + +struct Snapshot { + root: FilePointer, + map: Mmap, +} + +pub struct Reader { + state: Arc>, +} + +struct Db { + file: File, + map: MmapMut, + header: Header, + state: Arc>, +} + +#[derive(Clone, Copy)] +struct Modification { + old_range: Option, + new_range: FileRange, +} + +struct TransactionHandle<'t> { + db: &'t mut Db, + modifications: HashMap, + allocator: Allocator, +} + +struct Allocator { + head: FilePointer, +} + +impl Allocator { + unsafe fn allocate_range(&mut self, size: u64) -> FileRange { + todo!() + } + + unsafe fn free_range(&mut self, range: FileRange) { + todo!() + } + + unsafe fn allocate_modification( + &mut self, + old_range: Option, + size: u64, + ) -> Modification { + let new_range = self.allocate_range(size); + + Modification { + old_range, + new_range, + } + } +} + +impl<'t> TransactionHandle<'t> { + fn get_data<'a>(&'a mut self, modification: Modification) -> (FileRange, &'a mut [u8]) { + ( + modification.new_range, + &mut self.db.map[modification.new_range.as_range()], + ) + } + + pub unsafe fn free(&mut self, range: FileRange) { + self.allocator.free_range(range) + } + + pub unsafe fn allocate(&mut self, new_size: u64) -> (FileRange, &mut [u8]) { + let modification = self.allocator.allocate_modification(None, new_size); + + assert!(self + .modifications + .insert(modification.new_range.start, modification) + .is_none()); + + let modification = *self + .modifications + .get(&modification.new_range.start) + .unwrap(); + + self.get_data(modification) + } + + pub unsafe fn modify(&mut self, range: FileRange) -> (FileRange, &mut [u8]) { + self.resize(range, range.len()) + } + + fn resize(&mut self, range: FileRange, new_size: u64) -> (FileRange, &mut [u8]) { + let modification = *self + .modifications + .entry(range.start) + .or_insert_with(|| unsafe { + self.allocator.allocate_modification(Some(range), new_size) + }); + + assert_eq!( + modification.new_range.len(), + new_size, + "tried to resize a region twice." + ); + + let n = usize::try_from(range.len().min(new_size)).unwrap(); + + { + let old_range = range.as_range(); + let new_range = modification.new_range.as_range(); + + assert!(!old_range.contains(&new_range.start)); + assert!(!old_range.contains(&new_range.end)); + + assert!(!new_range.contains(&old_range.start)); + assert!(!new_range.contains(&old_range.end)); + } + + // this is fine, because we just allocated the space we copy into, so it can't overlap with the source + let old_data: &'static [u8] = unsafe { &*(&self.db.map[range.as_range()] as *const _) }; + + let data = self.get_data(modification); + + data.1[..n].copy_from_slice(&old_data[..n]); + + data + } +} + +impl Db { + fn transaction(f: fn(TransactionHandle)) { + // let handle = TransactionHandle {}; + } + + fn header_ptr() -> FilePointer { + FilePointer(0.into()) + } + + fn root_ptr() -> FilePointer { + FilePointer(16.into()) + } + + pub fn create_reader(&self) -> Reader { + Reader { + state: self.state.clone(), + } + } + + fn update_root(&mut self, new_root: FilePointer) -> Arc { + // flush all data in file + self.map.flush().unwrap(); + + // update root pointer and immediately flush + self.write(Self::root_ptr(), new_root); + self.map + .flush_range(Self::root_ptr().0.get() as usize, size_of::()) + .unwrap(); + + // TODO: we could do a second write + flush here for better consistency + + // update data that readers see + self.state.swap(Arc::new(Snapshot { + root: new_root, + map: self.create_readonly_map(), + })) + } + + fn read(&self, at: FilePointer) -> T { + self.read_range(at.range(size_of::() as u64)) + } + + fn read_range(&self, range: FileRange) -> T { + LayoutVerified::<_, T>::new(&self.map[range.as_range()]) + .unwrap() + .read() + } + + fn write(&mut self, at: FilePointer, data: T) { + self.write_range(at.range(size_of::() as u64), data) + } + + fn write_range(&mut self, range: FileRange, data: T) { + LayoutVerified::<_, T>::new(&mut self.map[range.as_range()]) + .unwrap() + .write(data) + } + + fn remap(&mut self) { + let map = unsafe { MmapMut::map_mut(&self.file) }.unwrap(); + self.map = map; + } + + fn create_readonly_map(&self) -> Mmap { + unsafe { Mmap::map(&self.file) }.unwrap() + } + + fn resize(&mut self, len: u64) { + self.file.set_len(len).unwrap(); + self.remap(); + } + + pub fn new(file: File) -> Self { + let len = file.metadata().unwrap().len(); + if len == 0 { + file.set_len(size_of::
() as u64).unwrap(); + } + + dbg!(&len); + + let mut map = unsafe { MmapMut::map_mut(&file) }.unwrap(); + + // TODO use the crate Db object and call Db::write(Db::header_ptr()) + let mut header_bytes = + LayoutVerified::<_, Header>::new(&mut map[..size_of::
()]).unwrap(); + + let header = if len == 0 { + let header = Header::default(); + header_bytes.write(header); + header + } else { + header_bytes.read() + }; + + let state = Arc::new(AtomicArc::new(Arc::new(Snapshot { + root: header.root, + map: todo!(), + }))); + + Self { + file, + map, + header, + state, + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn it_works() { + Db::root_ptr(); + + let db = Db::new(tempfile::tempfile().unwrap()); + } +}