refactor-fvs

This commit is contained in:
Aleksey Kladov 2019-01-26 15:11:47 +03:00
parent 3feaf2a008
commit 20d7a431fd
3 changed files with 254 additions and 345 deletions

View File

@ -1,19 +1,23 @@
use std::{fs, sync::Arc, thread}; use std::{
fs,
use crossbeam_channel::{Receiver, Sender}; path::{Path, PathBuf},
sync::{mpsc, Arc},
thread,
time::Duration,
};
use crossbeam_channel::{Receiver, Sender, SendError};
use relative_path::RelativePathBuf; use relative_path::RelativePathBuf;
use thread_worker::WorkerHandle; use thread_worker::WorkerHandle;
use walkdir::WalkDir; use walkdir::WalkDir;
use parking_lot::Mutex;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
mod watcher; use crate::{RootConfig, Roots, VfsRoot};
use watcher::Watcher;
use crate::{RootFilter, Roots, VfsRoot};
pub(crate) enum Task { pub(crate) enum Task {
AddRoot { AddRoot {
root: VfsRoot, root: VfsRoot,
filter: Arc<RootFilter>, config: Arc<RootConfig>,
}, },
} }
@ -39,6 +43,15 @@ pub enum TaskResult {
}, },
} }
#[derive(Debug)]
enum ChangeKind {
Create,
Write,
Remove,
}
const WATCHER_DELAY: Duration = Duration::from_millis(250);
pub(crate) struct Worker { pub(crate) struct Worker {
worker: thread_worker::Worker<Task, TaskResult>, worker: thread_worker::Worker<Task, TaskResult>,
worker_handle: WorkerHandle, worker_handle: WorkerHandle,
@ -48,21 +61,36 @@ impl Worker {
pub(crate) fn start(roots: Arc<Roots>) -> Worker { pub(crate) fn start(roots: Arc<Roots>) -> Worker {
let (worker, worker_handle) = let (worker, worker_handle) =
thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
let mut watcher = match Watcher::start(roots, output_sender.clone()) { let (notify_sender, notify_receiver) = mpsc::channel();
Ok(w) => Some(w), let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
Err(e) => { .map_err(|e| log::error!("failed to spawn notify {}", e))
log::error!("could not start watcher: {}", e); .ok();
None let ctx = WatcherCtx {
} roots,
watcher: Arc::new(Mutex::new(watcher)),
sender: output_sender,
}; };
let res = input_receiver let thread = thread::spawn({
.into_iter() let ctx = ctx.clone();
.filter_map(|t| handle_task(t, &mut watcher)) move || {
.try_for_each(|it| output_sender.send(it)); let _ = notify_receiver
if let Some(watcher) = watcher { .into_iter()
let _ = watcher.shutdown(); // forward relevant events only
.try_for_each(|change| ctx.handle_debounced_event(change));
}
});
let res1 = input_receiver.into_iter().try_for_each(|t| match t {
Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)),
});
drop(ctx.watcher.lock().take());
drop(ctx);
let res2 = thread.join();
match &res2 {
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
} }
res.unwrap() res1.unwrap();
res2.unwrap();
}); });
Worker { Worker {
worker, worker,
@ -84,46 +112,141 @@ impl Worker {
} }
} }
fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { fn watch_root(
match task { woker: &WatcherCtx,
Task::AddRoot { root, filter } => { root: VfsRoot,
if let Some(watcher) = watcher { config: Arc<RootConfig>,
watcher.watch_root(&filter) ) -> Result<(), SendError<TaskResult>> {
let mut guard = woker.watcher.lock();
log::debug!("loading {} ...", config.root.as_path().display());
let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config)
.into_iter()
.filter_map(|path| {
let abs_path = path.to_path(&config.root);
let text = fs::read_to_string(abs_path)
.map_err(|e| log::warn!("watcher error: {}", e))
.ok()?;
Some((path, text))
})
.collect();
woker
.sender
.send(TaskResult::BulkLoadRoot { root, files })?;
log::debug!("... loaded {}", config.root.as_path().display());
Ok(())
}
#[derive(Clone)]
struct WatcherCtx {
roots: Arc<Roots>,
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
sender: Sender<TaskResult>,
}
impl WatcherCtx {
fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), SendError<TaskResult>> {
match ev {
DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_)
| DebouncedEvent::Chmod(_) => {
// ignore
}
DebouncedEvent::Rescan => {
// TODO rescan all roots
}
DebouncedEvent::Create(path) => {
self.handle_change(path, ChangeKind::Create)?;
}
DebouncedEvent::Write(path) => {
self.handle_change(path, ChangeKind::Write)?;
}
DebouncedEvent::Remove(path) => {
self.handle_change(path, ChangeKind::Remove)?;
}
DebouncedEvent::Rename(src, dst) => {
self.handle_change(src, ChangeKind::Remove)?;
self.handle_change(dst, ChangeKind::Create)?;
}
DebouncedEvent::Error(err, path) => {
// TODO should we reload the file contents?
log::warn!("watcher error \"{}\", {:?}", err, path);
} }
log::debug!("loading {} ...", filter.root.as_path().display());
let files = load_root(filter.as_ref());
log::debug!("... loaded {}", filter.root.as_path().display());
Some(TaskResult::BulkLoadRoot { root, files })
} }
Ok(())
}
fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<(), SendError<TaskResult>> {
let (root, rel_path) = match self.roots.find(&path) {
None => return Ok(()),
Some(it) => it,
};
let config = &self.roots[root];
match kind {
ChangeKind::Create => {
let mut paths = Vec::new();
if path.is_dir() {
let mut guard = self.watcher.lock();
paths.extend(watch_recursive(guard.as_mut(), &path, &config));
} else {
paths.push(rel_path);
}
paths
.into_iter()
.filter_map(|rel_path| {
let abs_path = rel_path.to_path(&config.root);
let text = fs::read_to_string(&abs_path)
.map_err(|e| log::warn!("watcher failed {}", e))
.ok()?;
Some((rel_path, text))
})
.try_for_each(|(path, text)| {
self.sender
.send(TaskResult::AddSingleFile { root, path, text })
})?
}
ChangeKind::Write => match fs::read_to_string(&path) {
Err(e) => log::warn!("watcher failed {}", e),
Ok(text) => self.sender.send(TaskResult::ChangeSingleFile {
root,
path: rel_path,
text,
})?,
},
ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile {
root,
path: rel_path,
})?,
}
Ok(())
} }
} }
fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { fn watch_recursive(
let mut res = Vec::new(); mut watcher: Option<&mut RecommendedWatcher>,
for entry in WalkDir::new(&filter.root) dir: &Path,
config: &RootConfig,
) -> Vec<RelativePathBuf> {
let mut files = Vec::new();
for entry in WalkDir::new(dir)
.into_iter() .into_iter()
.filter_entry(filter.entry_filter()) .filter_entry(|it| config.contains(it.path()).is_some())
.filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok())
{ {
let entry = match entry { if entry.file_type().is_dir() {
Ok(entry) => entry, if let Some(watcher) = &mut watcher {
Err(e) => { watch_one(watcher, entry.path());
log::warn!("watcher error: {}", e);
continue;
} }
}; } else {
if !entry.file_type().is_file() { let path = config.contains(entry.path()).unwrap();
continue; files.push(path.to_owned());
} }
let path = entry.path();
let text = match fs::read_to_string(path) {
Ok(text) => text,
Err(e) => {
log::warn!("watcher error: {}", e);
continue;
}
};
let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap();
res.push((path.to_owned(), text))
} }
res files
}
fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
match watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
} }

View File

@ -1,200 +0,0 @@
use crate::{io, RootFilter, Roots, VfsRoot};
use crossbeam_channel::Sender;
use drop_bomb::DropBomb;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use parking_lot::Mutex;
use std::{
fs,
path::{Path, PathBuf},
sync::{mpsc, Arc},
thread,
time::Duration,
};
use walkdir::WalkDir;
#[derive(Debug)]
enum ChangeKind {
Create,
Write,
Remove,
}
const WATCHER_DELAY: Duration = Duration::from_millis(250);
pub(crate) struct Watcher {
thread: thread::JoinHandle<()>,
bomb: DropBomb,
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
}
impl Watcher {
pub(crate) fn start(
roots: Arc<Roots>,
output_sender: Sender<io::TaskResult>,
) -> Result<Watcher, Box<std::error::Error>> {
let (input_sender, input_receiver) = mpsc::channel();
let watcher = Arc::new(Mutex::new(Some(notify::watcher(
input_sender,
WATCHER_DELAY,
)?)));
let sender = output_sender.clone();
let watcher_clone = watcher.clone();
let thread = thread::spawn(move || {
let worker = WatcherWorker {
roots,
watcher: watcher_clone,
sender,
};
input_receiver
.into_iter()
// forward relevant events only
.try_for_each(|change| worker.handle_debounced_event(change))
.unwrap()
});
Ok(Watcher {
thread,
watcher,
bomb: DropBomb::new(format!("Watcher was not shutdown")),
})
}
pub fn watch_root(&mut self, filter: &RootFilter) {
for res in WalkDir::new(&filter.root)
.into_iter()
.filter_entry(filter.entry_filter())
{
match res {
Ok(entry) => {
if entry.file_type().is_dir() {
watch_one(self.watcher.as_ref(), entry.path());
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
pub fn shutdown(mut self) -> thread::Result<()> {
self.bomb.defuse();
drop(self.watcher.lock().take());
let res = self.thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
}
res
}
}
struct WatcherWorker {
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
roots: Arc<Roots>,
sender: Sender<io::TaskResult>,
}
impl WatcherWorker {
fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box<std::error::Error>> {
match ev {
DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_)
| DebouncedEvent::Chmod(_) => {
// ignore
}
DebouncedEvent::Rescan => {
// TODO rescan all roots
}
DebouncedEvent::Create(path) => {
self.handle_change(path, ChangeKind::Create);
}
DebouncedEvent::Write(path) => {
self.handle_change(path, ChangeKind::Write);
}
DebouncedEvent::Remove(path) => {
self.handle_change(path, ChangeKind::Remove);
}
DebouncedEvent::Rename(src, dst) => {
self.handle_change(src, ChangeKind::Remove);
self.handle_change(dst, ChangeKind::Create);
}
DebouncedEvent::Error(err, path) => {
// TODO should we reload the file contents?
log::warn!("watcher error \"{}\", {:?}", err, path);
}
}
Ok(())
}
fn handle_change(&self, path: PathBuf, kind: ChangeKind) {
if let Err(e) = self.try_handle_change(path, kind) {
log::warn!("watcher error: {}", e)
}
}
fn try_handle_change(
&self,
path: PathBuf,
kind: ChangeKind,
) -> Result<(), Box<std::error::Error>> {
let (root, rel_path) = match self.roots.find(&path) {
Some(x) => x,
None => return Ok(()),
};
match kind {
ChangeKind::Create => {
if path.is_dir() {
self.watch_recursive(&path, root);
} else {
let text = fs::read_to_string(&path)?;
self.sender.send(io::TaskResult::AddSingleFile {
root,
path: rel_path,
text,
})?
}
}
ChangeKind::Write => {
let text = fs::read_to_string(&path)?;
self.sender.send(io::TaskResult::ChangeSingleFile {
root,
path: rel_path,
text,
})?
}
ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile {
root,
path: rel_path,
})?,
}
Ok(())
}
fn watch_recursive(&self, dir: &Path, root: VfsRoot) {
let filter = &self.roots[root];
for res in WalkDir::new(dir)
.into_iter()
.filter_entry(filter.entry_filter())
{
match res {
Ok(entry) => {
if entry.file_type().is_dir() {
watch_one(self.watcher.as_ref(), entry.path());
} else {
// emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching
// emit as create because we haven't seen it yet
self.handle_change(entry.path().to_path_buf(), ChangeKind::Create);
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
}
fn watch_one(watcher: &Mutex<Option<RecommendedWatcher>>, dir: &Path) {
if let Some(watcher) = watcher.lock().as_mut() {
match watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
}
}

View File

@ -18,94 +18,78 @@ mod io;
use std::{ use std::{
cmp::Reverse, cmp::Reverse,
fmt, fs, mem, fmt, fs, mem,
ops::{Deref, DerefMut},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
thread, thread,
}; };
use crossbeam_channel::Receiver; use crossbeam_channel::Receiver;
use ra_arena::{impl_arena_id, Arena, RawId}; use ra_arena::{impl_arena_id, Arena, RawId, map::ArenaMap};
use relative_path::{Component, RelativePath, RelativePathBuf}; use relative_path::{Component, RelativePath, RelativePathBuf};
use rustc_hash::{FxHashMap, FxHashSet}; use rustc_hash::{FxHashMap, FxHashSet};
use walkdir::DirEntry;
pub use crate::io::TaskResult as VfsTask; pub use crate::io::TaskResult as VfsTask;
use io::{TaskResult, Worker}; use io::{TaskResult, Worker};
/// `RootFilter` is a predicate that checks if a file can belong to a root. If
/// several filters match a file (nested dirs), the most nested one wins.
pub(crate) struct RootFilter {
root: PathBuf,
filter: fn(&Path, &RelativePath) -> bool,
excluded_dirs: Vec<PathBuf>,
}
impl RootFilter {
fn new(root: PathBuf, excluded_dirs: Vec<PathBuf>) -> RootFilter {
RootFilter {
root,
filter: default_filter,
excluded_dirs,
}
}
/// Check if this root can contain `path`. NB: even if this returns
/// true, the `path` might actually be conained in some nested root.
pub(crate) fn can_contain(&self, path: &Path) -> Option<RelativePathBuf> {
let rel_path = path.strip_prefix(&self.root).ok()?;
let rel_path = RelativePathBuf::from_path(rel_path).ok()?;
if !(self.filter)(path, rel_path.as_relative_path()) {
return None;
}
Some(rel_path)
}
pub(crate) fn entry_filter<'a>(&'a self) -> impl FnMut(&DirEntry) -> bool + 'a {
move |entry: &DirEntry| {
if entry.file_type().is_dir() && self.excluded_dirs.iter().any(|it| it == entry.path())
{
// do not walk nested roots
false
} else {
self.can_contain(entry.path()).is_some()
}
}
}
}
pub(crate) fn default_filter(path: &Path, rel_path: &RelativePath) -> bool {
if path.is_dir() {
for (i, c) in rel_path.components().enumerate() {
if let Component::Normal(c) = c {
// TODO hardcoded for now
if (i == 0 && c == "target") || c == ".git" || c == "node_modules" {
return false;
}
}
}
true
} else {
rel_path.extension() == Some("rs")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct VfsRoot(pub RawId); pub struct VfsRoot(pub RawId);
impl_arena_id!(VfsRoot); impl_arena_id!(VfsRoot);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] /// Describes the contents of a single source root.
pub struct VfsFile(pub RawId); ///
impl_arena_id!(VfsFile); /// `RootConfig` can be thought of as a glob pattern like `src/**.rs` whihc
/// specifes the source root or as a function whihc takes a `PathBuf` and
struct VfsFileData { /// returns `true` iff path belongs to the source root
root: VfsRoot, pub(crate) struct RootConfig {
path: RelativePathBuf, root: PathBuf,
is_overlayed: bool, excluded_dirs: Vec<PathBuf>,
text: Arc<String>,
} }
pub(crate) struct Roots { pub(crate) struct Roots {
roots: Arena<VfsRoot, Arc<RootFilter>>, roots: Arena<VfsRoot, Arc<RootConfig>>,
}
impl std::ops::Deref for Roots {
type Target = Arena<VfsRoot, Arc<RootConfig>>;
fn deref(&self) -> &Self::Target {
&self.roots
}
}
impl RootConfig {
fn new(root: PathBuf, excluded_dirs: Vec<PathBuf>) -> RootConfig {
RootConfig {
root,
excluded_dirs,
}
}
/// Cheks if root contains a path and returns a root-relative path.
pub(crate) fn contains(&self, path: &Path) -> Option<RelativePathBuf> {
// First, check excluded dirs
if self.excluded_dirs.iter().any(|it| path.starts_with(it)) {
return None;
}
let rel_path = path.strip_prefix(&self.root).ok()?;
let rel_path = RelativePathBuf::from_path(rel_path).ok()?;
// Ignore some common directories.
//
// FIXME: don't hard-code, specify at source-root creation time using
// gitignore
for (i, c) in rel_path.components().enumerate() {
if let Component::Normal(c) = c {
if (i == 0 && c == "target") || c == ".git" || c == "node_modules" {
return None;
}
}
}
if path.is_file() && rel_path.extension() != Some("rs") {
return None;
}
Some(rel_path)
}
} }
impl Roots { impl Roots {
@ -120,59 +104,61 @@ impl Roots {
.map(|it| it.clone()) .map(|it| it.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let root_filter = Arc::new(RootFilter::new(path.clone(), nested_roots)); let config = Arc::new(RootConfig::new(path.clone(), nested_roots));
roots.alloc(root_filter.clone()); roots.alloc(config.clone());
} }
Roots { roots } Roots { roots }
} }
pub(crate) fn find(&self, path: &Path) -> Option<(VfsRoot, RelativePathBuf)> { pub(crate) fn find(&self, path: &Path) -> Option<(VfsRoot, RelativePathBuf)> {
self.roots self.roots
.iter() .iter()
.find_map(|(root, data)| data.can_contain(path).map(|it| (root, it))) .find_map(|(root, data)| data.contains(path).map(|it| (root, it)))
} }
} }
impl Deref for Roots { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
type Target = Arena<VfsRoot, Arc<RootFilter>>; pub struct VfsFile(pub RawId);
fn deref(&self) -> &Self::Target { impl_arena_id!(VfsFile);
&self.roots
}
}
impl DerefMut for Roots { struct VfsFileData {
fn deref_mut(&mut self) -> &mut Self::Target { root: VfsRoot,
&mut self.roots path: RelativePathBuf,
} is_overlayed: bool,
text: Arc<String>,
} }
pub struct Vfs { pub struct Vfs {
roots: Arc<Roots>, roots: Arc<Roots>,
files: Arena<VfsFile, VfsFileData>, files: Arena<VfsFile, VfsFileData>,
root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>, root2files: ArenaMap<VfsRoot, FxHashSet<VfsFile>>,
pending_changes: Vec<VfsChange>, pending_changes: Vec<VfsChange>,
worker: Worker, worker: Worker,
} }
impl fmt::Debug for Vfs { impl fmt::Debug for Vfs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Vfs { ... }") f.debug_struct("Vfs")
.field("n_roots", &self.roots.len())
.field("n_files", &self.files.len())
.field("n_pending_changes", &self.pending_changes.len())
.finish()
} }
} }
impl Vfs { impl Vfs {
pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
let roots = Arc::new(Roots::new(roots)); let roots = Arc::new(Roots::new(roots));
let worker = io::Worker::start(roots.clone()); let worker = io::Worker::start(Arc::clone(&roots));
let mut root2files = FxHashMap::default(); let mut root2files = ArenaMap::default();
for (root, filter) in roots.iter() { for (root, config) in roots.iter() {
root2files.insert(root, Default::default()); root2files.insert(root, Default::default());
worker worker
.sender() .sender()
.send(io::Task::AddRoot { .send(io::Task::AddRoot {
root, root,
filter: filter.clone(), config: Arc::clone(config),
}) })
.unwrap(); .unwrap();
} }
@ -242,7 +228,7 @@ impl Vfs {
let mut cur_files = Vec::new(); let mut cur_files = Vec::new();
// While we were scanning the root in the backgound, a file might have // While we were scanning the root in the backgound, a file might have
// been open in the editor, so we need to account for that. // been open in the editor, so we need to account for that.
let exising = self.root2files[&root] let exising = self.root2files[root]
.iter() .iter()
.map(|&file| (self.files[file].path.clone(), file)) .map(|&file| (self.files[file].path.clone(), file))
.collect::<FxHashMap<_, _>>(); .collect::<FxHashMap<_, _>>();
@ -384,7 +370,7 @@ impl Vfs {
is_overlayed, is_overlayed,
}; };
let file = self.files.alloc(data); let file = self.files.alloc(data);
self.root2files.get_mut(&root).unwrap().insert(file); self.root2files.get_mut(root).unwrap().insert(file);
file file
} }
@ -399,7 +385,7 @@ impl Vfs {
self.files[file].text = Default::default(); self.files[file].text = Default::default();
self.files[file].path = Default::default(); self.files[file].path = Default::default();
let root = self.files[file].root; let root = self.files[file].root;
let removed = self.root2files.get_mut(&root).unwrap().remove(&file); let removed = self.root2files.get_mut(root).unwrap().remove(&file);
assert!(removed); assert!(removed);
} }
@ -410,7 +396,7 @@ impl Vfs {
} }
fn find_file(&self, root: VfsRoot, path: &RelativePath) -> Option<VfsFile> { fn find_file(&self, root: VfsRoot, path: &RelativePath) -> Option<VfsFile> {
self.root2files[&root] self.root2files[root]
.iter() .iter()
.map(|&it| it) .map(|&it| it)
.find(|&file| self.files[file].path == path) .find(|&file| self.files[file].path == path)