perf(workspace-state): compress delta history, cache gitignore lookups, tighten debounce
This commit is contained in:
@@ -19,8 +19,8 @@ use crate::web::event_bridge::{emit_event, EventEmitter};
|
|||||||
pub const WORKSPACE_STATE_PROTOCOL_VERSION: u16 = 1;
|
pub const WORKSPACE_STATE_PROTOCOL_VERSION: u16 = 1;
|
||||||
|
|
||||||
const WATCH_IGNORED_DIRS: &[&str] = &["__pycache__"];
|
const WATCH_IGNORED_DIRS: &[&str] = &["__pycache__"];
|
||||||
const WATCH_DEBOUNCE_MS: u64 = 1_000;
|
const WATCH_DEBOUNCE_MS: u64 = 300;
|
||||||
const WATCH_MAX_BATCH_WINDOW_MS: u64 = 3_000;
|
const WATCH_MAX_BATCH_WINDOW_MS: u64 = 1_500;
|
||||||
const WATCH_MAX_CHANGED_PATHS: usize = 2_000;
|
const WATCH_MAX_CHANGED_PATHS: usize = 2_000;
|
||||||
const WATCH_EVENT_CHANNEL_CAPACITY: usize = 2_048;
|
const WATCH_EVENT_CHANNEL_CAPACITY: usize = 2_048;
|
||||||
const RECENT_EVENT_CAPACITY: usize = 24;
|
const RECENT_EVENT_CAPACITY: usize = 24;
|
||||||
@@ -82,7 +82,7 @@ struct WorkspaceStateCore {
|
|||||||
seq: u64,
|
seq: u64,
|
||||||
tree_snapshot: Vec<FileTreeNode>,
|
tree_snapshot: Vec<FileTreeNode>,
|
||||||
git_snapshot: Vec<WorkspaceGitEntry>,
|
git_snapshot: Vec<WorkspaceGitEntry>,
|
||||||
recent_events: VecDeque<WorkspaceDeltaEnvelope>,
|
recent_events: VecDeque<Arc<WorkspaceDeltaEnvelope>>,
|
||||||
recent_capacity: usize,
|
recent_capacity: usize,
|
||||||
degraded: bool,
|
degraded: bool,
|
||||||
is_git_repo: bool,
|
is_git_repo: bool,
|
||||||
@@ -120,13 +120,13 @@ impl WorkspaceStateCore {
|
|||||||
self.apply_payload(&payload);
|
self.apply_payload(&payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
let envelope = WorkspaceDeltaEnvelope {
|
let envelope = Arc::new(WorkspaceDeltaEnvelope {
|
||||||
seq: self.seq,
|
seq: self.seq,
|
||||||
kind: kind.clone(),
|
kind: kind.clone(),
|
||||||
payload: payload.clone(),
|
payload: payload.clone(),
|
||||||
requires_resync,
|
requires_resync,
|
||||||
changed_paths: changed_paths.clone(),
|
changed_paths: changed_paths.clone(),
|
||||||
};
|
});
|
||||||
self.push_recent_event(envelope);
|
self.push_recent_event(envelope);
|
||||||
|
|
||||||
WorkspaceStateEvent {
|
WorkspaceStateEvent {
|
||||||
@@ -147,7 +147,7 @@ impl WorkspaceStateCore {
|
|||||||
.recent_events
|
.recent_events
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|event| event.seq > since)
|
.filter(|event| event.seq > since)
|
||||||
.cloned()
|
.map(|event| (**event).clone())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
return WorkspaceSnapshotResponse {
|
return WorkspaceSnapshotResponse {
|
||||||
@@ -191,17 +191,49 @@ impl WorkspaceStateCore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push_recent_event(&mut self, event: WorkspaceDeltaEnvelope) {
|
fn push_recent_event(&mut self, event: Arc<WorkspaceDeltaEnvelope>) {
|
||||||
// Tree replace events carry large payloads. Keeping a long history of
|
// Tree/Git replace deltas are idempotent full snapshots — keeping older
|
||||||
// them can cause unnecessary memory growth on large workspaces.
|
// copies wastes memory and doesn't change replay outcomes. Strip the
|
||||||
|
// same-kind deltas from earlier envelopes but preserve their seq slot
|
||||||
|
// and `changed_paths` so strict seq continuity and lazy-load
|
||||||
|
// invalidation still work.
|
||||||
let has_tree_replace = event
|
let has_tree_replace = event
|
||||||
.payload
|
.payload
|
||||||
.iter()
|
.iter()
|
||||||
.any(|delta| matches!(delta, WorkspaceDelta::TreeReplace { .. }));
|
.any(|delta| matches!(delta, WorkspaceDelta::TreeReplace { .. }));
|
||||||
if has_tree_replace {
|
let has_git_replace = event
|
||||||
self.recent_events.clear();
|
.payload
|
||||||
self.recent_events.push_back(event);
|
.iter()
|
||||||
return;
|
.any(|delta| matches!(delta, WorkspaceDelta::GitReplace { .. }));
|
||||||
|
|
||||||
|
if has_tree_replace || has_git_replace {
|
||||||
|
for slot in self.recent_events.iter_mut() {
|
||||||
|
let needs_rewrite = slot.payload.iter().any(|delta| match delta {
|
||||||
|
WorkspaceDelta::TreeReplace { .. } => has_tree_replace,
|
||||||
|
WorkspaceDelta::GitReplace { .. } => has_git_replace,
|
||||||
|
WorkspaceDelta::Meta { .. } => false,
|
||||||
|
});
|
||||||
|
if !needs_rewrite {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let remaining: Vec<WorkspaceDelta> = slot
|
||||||
|
.payload
|
||||||
|
.iter()
|
||||||
|
.filter(|delta| match delta {
|
||||||
|
WorkspaceDelta::TreeReplace { .. } => !has_tree_replace,
|
||||||
|
WorkspaceDelta::GitReplace { .. } => !has_git_replace,
|
||||||
|
WorkspaceDelta::Meta { .. } => true,
|
||||||
|
})
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
*slot = Arc::new(WorkspaceDeltaEnvelope {
|
||||||
|
seq: slot.seq,
|
||||||
|
kind: slot.kind.clone(),
|
||||||
|
payload: remaining,
|
||||||
|
requires_resync: slot.requires_resync,
|
||||||
|
changed_paths: slot.changed_paths.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.recent_events.push_back(event);
|
self.recent_events.push_back(event);
|
||||||
@@ -401,6 +433,65 @@ fn git_check_ignored_paths(
|
|||||||
Ok(ignored)
|
Ok(ignored)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
struct GitignoreCacheEntry {
|
||||||
|
ignored: bool,
|
||||||
|
expires_at: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
const GITIGNORE_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||||
|
const GITIGNORE_CACHE_MAX_ENTRIES: usize = 4_096;
|
||||||
|
|
||||||
|
static GITIGNORE_CACHE: LazyLock<Mutex<HashMap<(String, String), GitignoreCacheEntry>>> =
|
||||||
|
LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
|
fn gitignore_cache_lookup(root: &str, path: &str) -> Option<bool> {
|
||||||
|
let mut cache = GITIGNORE_CACHE.lock().ok()?;
|
||||||
|
let key = (root.to_string(), path.to_string());
|
||||||
|
let entry = *cache.get(&key)?;
|
||||||
|
if entry.expires_at <= Instant::now() {
|
||||||
|
cache.remove(&key);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(entry.ignored)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gitignore_cache_put_batch(root: &str, results: impl IntoIterator<Item = (String, bool)>) {
|
||||||
|
let Ok(mut cache) = GITIGNORE_CACHE.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if cache.len() >= GITIGNORE_CACHE_MAX_ENTRIES {
|
||||||
|
let mut sorted: Vec<_> = cache
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.clone(), v.expires_at))
|
||||||
|
.collect();
|
||||||
|
sorted.sort_by_key(|(_, exp)| *exp);
|
||||||
|
let drop_count = cache.len() / 4;
|
||||||
|
for (k, _) in sorted.into_iter().take(drop_count) {
|
||||||
|
cache.remove(&k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let expires_at = Instant::now() + GITIGNORE_CACHE_TTL;
|
||||||
|
for (path, ignored) in results {
|
||||||
|
cache.insert(
|
||||||
|
(root.to_string(), path),
|
||||||
|
GitignoreCacheEntry {
|
||||||
|
ignored,
|
||||||
|
expires_at,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gitignore_cache_invalidate_root(root: &str) {
|
||||||
|
let Ok(mut cache) = GITIGNORE_CACHE.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
cache.retain(|(r, _), _| r != root);
|
||||||
|
}
|
||||||
|
|
||||||
async fn should_refresh_git_status_for_paths(root_display: &str, changed_paths: &[String]) -> bool {
|
async fn should_refresh_git_status_for_paths(root_display: &str, changed_paths: &[String]) -> bool {
|
||||||
if changed_paths.is_empty() {
|
if changed_paths.is_empty() {
|
||||||
return true;
|
return true;
|
||||||
@@ -409,6 +500,9 @@ async fn should_refresh_git_status_for_paths(root_display: &str, changed_paths:
|
|||||||
let mut candidates: Vec<String> = Vec::new();
|
let mut candidates: Vec<String> = Vec::new();
|
||||||
for path in changed_paths {
|
for path in changed_paths {
|
||||||
if is_git_metadata_rel_path(path) || is_gitignore_rel_path(path) {
|
if is_git_metadata_rel_path(path) || is_gitignore_rel_path(path) {
|
||||||
|
// `.gitignore` or `.git/*` changed — our ignore cache is likely
|
||||||
|
// stale; drop it before returning.
|
||||||
|
gitignore_cache_invalidate_root(root_display);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
candidates.push(path.clone());
|
candidates.push(path.clone());
|
||||||
@@ -418,10 +512,24 @@ async fn should_refresh_git_status_for_paths(root_display: &str, changed_paths:
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut missing: Vec<String> = Vec::new();
|
||||||
|
for path in &candidates {
|
||||||
|
match gitignore_cache_lookup(root_display, path) {
|
||||||
|
Some(false) => return true, // cached non-ignored → must refresh
|
||||||
|
Some(true) => {}
|
||||||
|
None => missing.push(path.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if missing.is_empty() {
|
||||||
|
// All candidates were cached as ignored — nothing to refresh.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
let repo_path = root_display.to_string();
|
let repo_path = root_display.to_string();
|
||||||
let candidates_for_check = candidates.clone();
|
let missing_for_check = missing.clone();
|
||||||
let ignored = match tokio::task::spawn_blocking(move || {
|
let ignored = match tokio::task::spawn_blocking(move || {
|
||||||
git_check_ignored_paths(&repo_path, &candidates_for_check)
|
git_check_ignored_paths(&repo_path, &missing_for_check)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@@ -430,9 +538,14 @@ async fn should_refresh_git_status_for_paths(root_display: &str, changed_paths:
|
|||||||
_ => return true,
|
_ => return true,
|
||||||
};
|
};
|
||||||
|
|
||||||
candidates
|
let results: Vec<(String, bool)> = missing
|
||||||
.iter()
|
.iter()
|
||||||
.any(|path| !ignored.contains(path.as_str()))
|
.map(|path| (path.clone(), ignored.contains(path.as_str())))
|
||||||
|
.collect();
|
||||||
|
let should_refresh = results.iter().any(|(_, is_ignored)| !is_ignored);
|
||||||
|
gitignore_cache_put_batch(root_display, results);
|
||||||
|
|
||||||
|
should_refresh
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_allowed_git_watch_path(relative: &Path) -> bool {
|
fn is_allowed_git_watch_path(relative: &Path) -> bool {
|
||||||
@@ -590,9 +703,13 @@ async fn git_numstat_map(path: &str) -> HashMap<String, (i32, i32)> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn collect_git_snapshot(path: &str) -> Result<Vec<WorkspaceGitEntry>, AppCommandError> {
|
async fn collect_git_snapshot(path: &str) -> Result<Vec<WorkspaceGitEntry>, AppCommandError> {
|
||||||
let status_entries = folders::git_status(path.to_string(), Some(true)).await?;
|
// status + numstat don't depend on each other; run concurrently to cut
|
||||||
|
// per-flush latency roughly in half on large repos.
|
||||||
let stats = git_numstat_map(path).await;
|
let (status_entries, stats) = tokio::join!(
|
||||||
|
folders::git_status(path.to_string(), Some(true)),
|
||||||
|
git_numstat_map(path),
|
||||||
|
);
|
||||||
|
let status_entries = status_entries?;
|
||||||
|
|
||||||
let mut result = status_entries
|
let mut result = status_entries
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -1163,4 +1280,108 @@ mod tests {
|
|||||||
assert!(snapshot.tree_snapshot.is_some());
|
assert!(snapshot.tree_snapshot.is_some());
|
||||||
assert!(snapshot.git_snapshot.is_some());
|
assert!(snapshot.git_snapshot.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn workspace_state_core_same_kind_replace_is_compressed() {
|
||||||
|
let mut core =
|
||||||
|
WorkspaceStateCore::new("/tmp/repo".to_string(), Vec::new(), Vec::new(), false);
|
||||||
|
|
||||||
|
let e1 = core.append_event(
|
||||||
|
"git_delta".to_string(),
|
||||||
|
vec![WorkspaceDelta::GitReplace {
|
||||||
|
entries: vec![WorkspaceGitEntry {
|
||||||
|
path: "a.txt".to_string(),
|
||||||
|
status: "M".to_string(),
|
||||||
|
additions: 1,
|
||||||
|
deletions: 0,
|
||||||
|
}],
|
||||||
|
}],
|
||||||
|
false,
|
||||||
|
vec!["a.txt".to_string()],
|
||||||
|
);
|
||||||
|
|
||||||
|
let e2 = core.append_event(
|
||||||
|
"meta".to_string(),
|
||||||
|
vec![WorkspaceDelta::Meta {
|
||||||
|
reason: "tick".to_string(),
|
||||||
|
}],
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
|
);
|
||||||
|
|
||||||
|
core.append_event(
|
||||||
|
"git_delta".to_string(),
|
||||||
|
vec![WorkspaceDelta::GitReplace { entries: vec![] }],
|
||||||
|
false,
|
||||||
|
vec!["a.txt".to_string()],
|
||||||
|
);
|
||||||
|
|
||||||
|
let snapshot = core.snapshot(Some(0));
|
||||||
|
assert!(!snapshot.full);
|
||||||
|
assert_eq!(snapshot.deltas.len(), 3);
|
||||||
|
|
||||||
|
let first = snapshot
|
||||||
|
.deltas
|
||||||
|
.iter()
|
||||||
|
.find(|d| d.seq == e1.seq)
|
||||||
|
.expect("e1 still present");
|
||||||
|
assert!(
|
||||||
|
first.payload.is_empty(),
|
||||||
|
"older GitReplace payload should be dropped after compression"
|
||||||
|
);
|
||||||
|
assert_eq!(first.changed_paths, vec!["a.txt".to_string()]);
|
||||||
|
|
||||||
|
let meta = snapshot
|
||||||
|
.deltas
|
||||||
|
.iter()
|
||||||
|
.find(|d| d.seq == e2.seq)
|
||||||
|
.expect("meta still present");
|
||||||
|
assert_eq!(meta.payload.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn workspace_state_core_tree_replace_compresses_older_tree_but_keeps_git() {
|
||||||
|
let mut core =
|
||||||
|
WorkspaceStateCore::new("/tmp/repo".to_string(), Vec::new(), Vec::new(), false);
|
||||||
|
|
||||||
|
core.append_event(
|
||||||
|
"fs_delta".to_string(),
|
||||||
|
vec![WorkspaceDelta::TreeReplace { nodes: Vec::new() }],
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
|
);
|
||||||
|
let git_event = core.append_event(
|
||||||
|
"git_delta".to_string(),
|
||||||
|
vec![WorkspaceDelta::GitReplace {
|
||||||
|
entries: vec![WorkspaceGitEntry {
|
||||||
|
path: "b.txt".to_string(),
|
||||||
|
status: "??".to_string(),
|
||||||
|
additions: 0,
|
||||||
|
deletions: 0,
|
||||||
|
}],
|
||||||
|
}],
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
|
);
|
||||||
|
core.append_event(
|
||||||
|
"fs_delta".to_string(),
|
||||||
|
vec![WorkspaceDelta::TreeReplace { nodes: Vec::new() }],
|
||||||
|
false,
|
||||||
|
Vec::new(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let snapshot = core.snapshot(Some(0));
|
||||||
|
assert!(!snapshot.full);
|
||||||
|
assert_eq!(snapshot.deltas.len(), 3);
|
||||||
|
|
||||||
|
let git_slot = snapshot
|
||||||
|
.deltas
|
||||||
|
.iter()
|
||||||
|
.find(|d| d.seq == git_event.seq)
|
||||||
|
.expect("git delta still present");
|
||||||
|
assert!(matches!(
|
||||||
|
git_slot.payload.as_slice(),
|
||||||
|
[WorkspaceDelta::GitReplace { .. }]
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1084,7 +1084,7 @@ export function FileTreeTab() {
|
|||||||
// reflected in the backend's depth-2 tree_snapshot, so changes inside them
|
// reflected in the backend's depth-2 tree_snapshot, so changes inside them
|
||||||
// don't emit a tree_replace delta — the frontend has to target invalidation
|
// don't emit a tree_replace delta — the frontend has to target invalidation
|
||||||
// by matching each `changed_paths` entry against its cached ancestors.
|
// by matching each `changed_paths` entry against its cached ancestors.
|
||||||
// The backend already debounces raw FS events (1s / 3s max), so we only
|
// The backend already debounces raw FS events (300ms / 1.5s max), so we only
|
||||||
// need a microtask hop here to merge paths that hit the same cached
|
// need a microtask hop here to merge paths that hit the same cached
|
||||||
// ancestor within one envelope (or any synchronous burst of envelopes).
|
// ancestor within one envelope (or any synchronous burst of envelopes).
|
||||||
const subscribeWorkspaceEnvelopes = workspaceState.subscribeEnvelopes
|
const subscribeWorkspaceEnvelopes = workspaceState.subscribeEnvelopes
|
||||||
|
|||||||
Reference in New Issue
Block a user