Parallel scanning: early proof-of-concept implementation

And it's not looking well; this implementation seems to be 3x slower in
the hot cache scenario with -J8, which is a major regression. There's
way too much lock contention and context switching.

Haven't tested with actual disk I/O yet and I've not yet measured how
much parallelism this approach will actually get us in practice, nor
whether the disk access patterns of this approach make a whole lot of
sense.  Maybe this low-memory approach will not work out and I'll end up
rewriting this to scan disjoint subtrees after all.

TODO:
- Validate how much parallelism we can actually get with this algorithm
- Lots of benchmarking and tuning (and most likely some re-architecting)
- Re-implement exclude pattern matching
- Document -J option
- Make OOM handling thread-safe
This commit is contained in:
Yorhel 2022-01-11 12:40:20 +01:00
parent 35dd631e55
commit c27dca1fba
3 changed files with 305 additions and 82 deletions

View file

@ -42,6 +42,7 @@ pub const config = struct {
pub var same_fs: bool = false; pub var same_fs: bool = false;
pub var extended: bool = false; pub var extended: bool = false;
pub var parallel: u8 = 1;
pub var follow_symlinks: bool = false; pub var follow_symlinks: bool = false;
pub var exclude_caches: bool = false; pub var exclude_caches: bool = false;
pub var exclude_kernfs: bool = false; pub var exclude_kernfs: bool = false;
@ -158,7 +159,11 @@ const Args = struct {
fn argConfig(args: *Args, opt: Args.Option) bool { fn argConfig(args: *Args, opt: Args.Option) bool {
if (opt.is("-q") or opt.is("--slow-ui-updates")) config.update_delay = 2*std.time.ns_per_s if (opt.is("-q") or opt.is("--slow-ui-updates")) config.update_delay = 2*std.time.ns_per_s
else if (opt.is("--fast-ui-updates")) config.update_delay = 100*std.time.ns_per_ms else if (opt.is("--fast-ui-updates")) config.update_delay = 100*std.time.ns_per_ms
else if (opt.is("-x") or opt.is("--one-file-system")) config.same_fs = true else if (opt.is("-J")) {
const val = args.arg();
config.parallel = std.fmt.parseInt(u8, val, 10) catch ui.die("Invalid argument to -J: {s}, expected number.\n", .{val});
if (config.parallel == 0) ui.die("Number of threads (-J) cannot be 0.\n", .{});
} else if (opt.is("-x") or opt.is("--one-file-system")) config.same_fs = true
else if (opt.is("--cross-file-system")) config.same_fs = false else if (opt.is("--cross-file-system")) config.same_fs = false
else if (opt.is("-e") or opt.is("--extended")) config.extended = true else if (opt.is("-e") or opt.is("--extended")) config.extended = true
else if (opt.is("--no-extended")) config.extended = false else if (opt.is("--no-extended")) config.extended = false

View file

@ -53,11 +53,15 @@ const Stat = struct {
} }
}; };
var kernfs_cache: std.AutoHashMap(u64,bool) = std.AutoHashMap(u64,bool).init(main.allocator);
// This function only works on Linux // This function only works on Linux
fn isKernfs(dir: std.fs.Dir, dev: u64) bool { fn isKernfs(dir: std.fs.Dir, dev: u64) bool {
if (kernfs_cache.get(dev)) |e| return e; const state = struct {
var cache = std.AutoHashMap(u64,bool).init(main.allocator);
var lock = std.Thread.Mutex{};
};
state.lock.lock();
defer state.lock.unlock();
if (state.cache.get(dev)) |e| return e;
var buf: c_statfs.struct_statfs = undefined; var buf: c_statfs.struct_statfs = undefined;
if (c_statfs.fstatfs(dir.fd, &buf) != 0) return false; // silently ignoring errors isn't too nice. if (c_statfs.fstatfs(dir.fd, &buf) != 0) return false; // silently ignoring errors isn't too nice.
const iskern = switch (buf.f_type) { const iskern = switch (buf.f_type) {
@ -77,7 +81,7 @@ fn isKernfs(dir: std.fs.Dir, dev: u64) bool {
=> true, => true,
else => false, else => false,
}; };
kernfs_cache.put(dev, iskern) catch {}; state.cache.put(dev, iskern) catch {};
return iskern; return iskern;
} }
@ -103,10 +107,10 @@ fn writeJsonString(wr: anytype, s: []const u8) !void {
try wr.writeByte('"'); try wr.writeByte('"');
} }
// A ScanDir represents an in-memory directory listing (i.e. model.Dir) where // A MemDir represents an in-memory directory listing (i.e. model.Dir) where
// entries read from disk can be merged into, without doing an O(1) lookup for // entries read from disk can be merged into, without doing an O(1) lookup for
// each entry. // each entry.
const ScanDir = struct { const MemDir = struct {
dir: *model.Dir, dir: *model.Dir,
// Lookup table for name -> *entry. // Lookup table for name -> *entry.
@ -263,7 +267,7 @@ const ScanDir = struct {
// //
const Context = struct { const Context = struct {
// When scanning to RAM // When scanning to RAM
parents: ?std.ArrayList(ScanDir) = null, parents: ?std.ArrayList(MemDir) = null,
// When scanning to a file // When scanning to a file
wr: ?*Writer = null, wr: ?*Writer = null,
@ -303,8 +307,8 @@ const Context = struct {
fn initMem(dir: ?*model.Dir) *Self { fn initMem(dir: ?*model.Dir) *Self {
var self = main.allocator.create(Self) catch unreachable; var self = main.allocator.create(Self) catch unreachable;
self.* = .{ .parents = std.ArrayList(ScanDir).init(main.allocator) }; self.* = .{ .parents = std.ArrayList(MemDir).init(main.allocator) };
if (dir) |d| self.parents.?.append(ScanDir.init(d)) catch unreachable; if (dir) |d| self.parents.?.append(MemDir.init(d)) catch unreachable;
return self; return self;
} }
@ -427,7 +431,7 @@ const Context = struct {
p.items[p.items.len-1].addStat(self.name, &self.stat); p.items[p.items.len-1].addStat(self.name, &self.stat);
if (e.dir()) |d| // Enter the directory if (e.dir()) |d| // Enter the directory
p.append(ScanDir.init(d)) catch unreachable; p.append(MemDir.init(d)) catch unreachable;
} else if (self.wr) |wr| } else if (self.wr) |wr|
self.writeStat(wr.writer(), dir_dev) catch |e| writeErr(e); self.writeStat(wr.writer(), dir_dev) catch |e| writeErr(e);
@ -451,91 +455,305 @@ const Context = struct {
// Context that is currently being used for scanning. // Context that is currently being used for scanning.
var active_context: *Context = undefined; var active_context: *Context = undefined;
// Read and index entries of the given dir.
fn scanDir(ctx: *Context, dir: std.fs.Dir, dir_dev: u64) void { // The following filesystem scanning implementation is designed to support
var it = main.allocator.create(std.fs.Dir.Iterator) catch unreachable; // some degree of parallelism while generating a serialized tree without
defer main.allocator.destroy(it); // consuming ~too~ much memory.
it.* = dir.iterate(); //
// It would likely be easier and more efficient to have each thread work on a
// completely sparate branch of the filesystem tree, but our current JSON
// export format requires that entries are output in a certain order, which
// means we either need to construct the full tree in memory before generating
// any output (which I'd really rather not do), or we're stuck scanning the
// filesystem in the required order and lose some opportunities for
// parallelism. This is an attempt at doing the latter.
const scanner = struct {
var tail: *Level = undefined;
// Currently used to protect both the scan stack state and the output
// Context, may be worth trying to split in two.
var lock = std.Thread.Mutex{};
var cond = std.Thread.Condition{};
// Number of stat() calls to batch in a single task; This little thread
// pool implementation is pretty damn inefficient, so batching helps cut
// down on synchronization overhead. Can be removed if we ever manage to
// migrate to a more efficient thread pool.
const BATCH: usize = 128;
// Maximum number of name lists to keep for each level in the stack. Higher
// number means potentially higher degree of parallelism, but comes at the
// cost of potentially higher memory and file descriptor use.
const SUBDIRS_PER_LEVEL: u8 = 8;
const StatEntry = struct {
name: [:0]u8,
stat: Stat,
};
const SpecialEntry = struct {
name: [:0]u8,
t: Context.Special,
};
const NextLevel = struct {
name: [:0]u8,
stat: Stat,
level: *Level, // XXX: Only 'dir', 'names', and 'specials' are really relevant here
};
// Represents a directory that is being scanned.
const Level = struct {
dir: std.fs.Dir,
dir_dev: u64,
dirListError: bool = false,
names: std.ArrayListUnmanaged([:0]u8) = .{}, // Queue of names to stat()
names_busy: u8 = 0, // Number of threads running stat()
files: std.ArrayListUnmanaged(StatEntry) = .{}, // Queue of files we can output
specials: std.ArrayListUnmanaged(SpecialEntry) = .{}, // Queue of "special" items we can output
dirs: std.ArrayListUnmanaged(StatEntry) = .{}, // Queue of dirs we can read
dirs_busy: u8 = 0, // Number of 'dirs' being processed at the moment
next: std.ArrayListUnmanaged(NextLevel) = .{}, // Queue of subdirs to scan next
sub: ?*Level = null, // Subdirectory currently being scanned
parent: ?*Level,
// Assumption: all queues are empty
fn destroy(lvl: *Level) void {
lvl.dir.close();
lvl.names.deinit(main.allocator);
lvl.files.deinit(main.allocator);
lvl.specials.deinit(main.allocator);
lvl.dirs.deinit(main.allocator);
lvl.next.deinit(main.allocator);
main.allocator.destroy(lvl);
}
};
// Drain the output queue ('files', 'specials') if we can.
// Assumes we hold the lock.
fn outputQueue(lvl: *Level) void {
if (lvl.sub != null) return;
if (lvl.dirListError) {
active_context.setDirlistError();
lvl.dirListError = false;
}
for (lvl.specials.items) |e| {
active_context.stat.dir = false;
active_context.pushPath(e.name);
active_context.addSpecial(e.t);
active_context.popPath();
main.allocator.free(e.name);
}
for (lvl.files.items) |e| {
// TODO: ctx API is inefficient here, no need to copy that Stat
active_context.stat.dir = false;
active_context.pushPath(e.name);
active_context.stat = e.stat;
active_context.addStat(lvl.dir_dev);
active_context.popPath();
main.allocator.free(e.name);
}
lvl.specials.clearRetainingCapacity();
lvl.files.clearRetainingCapacity();
}
// Leave the current dir if we're done with it and find a new dir to enter.
fn navigate() void {
//std.debug.print("ctx={s}, names={} dirs={} next={}\n", .{ active_context.path.items, tail.names.items.len, tail.dirs.items.len, tail.next.items.len });
// Assumption: outputQueue() has been called on the tail, so
// 'files' and 'specials' are always empty.
while (tail.parent != null and tail.sub == null
and tail.names.items.len == 0 and tail.names_busy == 0
and tail.dirs.items.len == 0 and tail.dirs_busy == 0
and tail.next.items.len == 0
) {
//std.debug.print("Pop\n", .{});
active_context.popPath();
const lvl = tail;
lvl.parent.?.sub = null;
tail = lvl.parent.?;
lvl.destroy();
outputQueue(tail);
}
if (tail.sub == null and tail.next.items.len > 0) {
const sub = tail.next.pop();
//std.debug.print("Push {s}\n", .{sub.name});
active_context.pushPath(sub.name);
active_context.stat = sub.stat;
active_context.addStat(tail.dir_dev);
main.allocator.free(sub.name);
tail.sub = sub.level;
tail = sub.level;
}
// TODO: Only wake up threads when there's enough new work queued, all
// that context switching is SLOW.
cond.broadcast();
}
fn readNamesDir(lvl: *Level) void {
var it = lvl.dir.iterate();
while (true) { while (true) {
const entry = it.next() catch { const entry = it.next() catch {
ctx.setDirlistError(); lvl.dirListError = true;
return; break;
} orelse break; } orelse break;
ctx.stat.dir = false; // TODO: Check for exclude patterns
ctx.pushPath(entry.name);
defer ctx.popPath();
main.handleEvent(false, false);
// XXX: This algorithm is extremely slow, can be optimized with some clever pattern parsing. lvl.names.append(main.allocator, main.allocator.dupeZ(u8, entry.name) catch unreachable) catch unreachable;
const excluded = blk: {
for (main.config.exclude_patterns.items) |pat| {
var path = ctx.pathZ();
while (path.len > 0) {
if (c_fnmatch.fnmatch(pat, path, 0) == 0) break :blk true;
if (std.mem.indexOfScalar(u8, path, '/')) |idx| path = path[idx+1..:0]
else break;
} }
} }
break :blk false;
};
if (excluded) {
ctx.addSpecial(.excluded);
continue;
}
ctx.stat = Stat.read(dir, ctx.name, false) catch { fn readNames(parent: *Level) void {
ctx.addSpecial(.err); const stat = parent.dirs.pop();
continue; lock.unlock();
var dir = parent.dir.openDirZ(stat.name, .{ .access_sub_paths = true, .iterate = true, .no_follow = true }) catch {
lock.lock();
parent.specials.append(main.allocator, .{ .name = stat.name, .t = .err }) catch unreachable;
return;
}; };
if (main.config.same_fs and ctx.stat.dev != dir_dev) { if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and isKernfs(dir, stat.stat.dev)) {
ctx.addSpecial(.other_fs); lock.lock();
continue; parent.specials.append(main.allocator, .{ .name = stat.name, .t = .kernfs }) catch unreachable;
return;
} }
if (main.config.follow_symlinks and ctx.stat.symlink) { if (main.config.exclude_caches) {
if (Stat.read(dir, ctx.name, true)) |nstat| { if (dir.openFileZ("CACHEDIR.TAG", .{})) |f| {
if (!nstat.dir) {
ctx.stat = nstat;
// Symlink targets may reside on different filesystems,
// this will break hardlink detection and counting so let's disable it.
if (ctx.stat.hlinkc and ctx.stat.dev != dir_dev)
ctx.stat.hlinkc = false;
}
} else |_| {}
}
var edir =
if (ctx.stat.dir) dir.openDirZ(ctx.name, .{ .access_sub_paths = true, .iterate = true, .no_follow = true }) catch {
ctx.addSpecial(.err);
continue;
} else null;
defer if (edir != null) edir.?.close();
if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and ctx.stat.dir and isKernfs(edir.?, ctx.stat.dev)) {
ctx.addSpecial(.kernfs);
continue;
}
if (main.config.exclude_caches and ctx.stat.dir) {
if (edir.?.openFileZ("CACHEDIR.TAG", .{})) |f| {
const sig = "Signature: 8a477f597d28d172789f06886806bc55"; const sig = "Signature: 8a477f597d28d172789f06886806bc55";
var buf: [sig.len]u8 = undefined; var buf: [sig.len]u8 = undefined;
if (f.reader().readAll(&buf)) |len| { if (f.reader().readAll(&buf)) |len| {
if (len == sig.len and std.mem.eql(u8, &buf, sig)) { if (len == sig.len and std.mem.eql(u8, &buf, sig)) {
ctx.addSpecial(.excluded); lock.lock();
continue; parent.specials.append(main.allocator, .{ .name = stat.name, .t = .excluded }) catch unreachable;
return;
} }
} else |_| {} } else |_| {}
} else |_| {} } else |_| {}
} }
ctx.addStat(dir_dev); var lvl = main.allocator.create(Level) catch unreachable;
if (ctx.stat.dir) scanDir(ctx, edir.?, ctx.stat.dev); lvl.* = .{ .dir = dir, .dir_dev = stat.stat.dev, .parent = parent };
readNamesDir(lvl);
lock.lock();
// Treat empty directories as files
if (lvl.names.items.len == 0 and lvl.specials.items.len == 0) {
if (lvl.dirListError) { // XXX: this loses information about this entry being a directory :(
parent.specials.append(main.allocator, .{ .name = stat.name, .t = .err }) catch unreachable;
} else
parent.files.append(main.allocator, stat) catch unreachable;
dir.close();
main.allocator.destroy(lvl);
} else {
parent.next.append(main.allocator, .{ .name = stat.name, .stat = stat.stat, .level = lvl }) catch unreachable;
} }
} }
fn statNames(lvl: *Level) void {
var names: [BATCH][:0]u8 = undefined;
var stats: [BATCH]Stat = undefined;
var errs: [BATCH]bool = undefined;
const len = std.math.min(names.len, lvl.names.items.len);
std.mem.copy([]u8, &names, lvl.names.items[lvl.names.items.len-len..]);
lvl.names.items.len -= len;
lock.unlock();
var i: usize = 0;
while (i < len) : (i += 1) {
if (Stat.read(lvl.dir, names[i], false)) |s| {
errs[i] = false;
if (main.config.follow_symlinks and s.symlink) {
if (Stat.read(lvl.dir, names[i], true)) |nstat| {
if (!nstat.dir) {
stats[i] = nstat;
// Symlink targets may reside on different filesystems,
// this will break hardlink detection and counting so let's disable it.
if (nstat.hlinkc and nstat.dev != lvl.dir_dev)
stats[i].hlinkc = false;
}
} else |_| stats[i] = s;
} else stats[i] = s;
} else |_|
errs[i] = true;
}
lock.lock();
i = 0;
while (i < len) : (i += 1) {
if (errs[i])
lvl.specials.append(main.allocator, .{ .name = names[i], .t = .err }) catch unreachable
else if (main.config.same_fs and stats[i].dev != lvl.dir_dev)
lvl.specials.append(main.allocator, .{ .name = names[i], .t = .other_fs }) catch unreachable
else if (stats[i].dir)
lvl.dirs.append(main.allocator, .{ .name = names[i], .stat = stats[i] }) catch unreachable
else
lvl.files.append(main.allocator, .{ .name = names[i], .stat = stats[i] }) catch unreachable;
}
}
fn runThread() void {
lock.lock();
outer: while (true) {
var curlvl: ?*Level = tail;
while (curlvl) |lvl| : (curlvl = lvl.parent) {
// If we have subdirectories to read, do that first to keep the 'names' queues filled up.
if (lvl.dirs.items.len > 0 and lvl.dirs_busy + lvl.next.items.len < SUBDIRS_PER_LEVEL) {
lvl.dirs_busy += 1;
readNames(lvl);
lvl.dirs_busy -= 1;
outputQueue(lvl);
navigate();
continue :outer;
}
// Then look for names to stat
if (lvl.names.items.len > 0) {
lvl.names_busy += 1;
statNames(lvl);
lvl.names_busy -= 1;
outputQueue(lvl);
navigate();
continue :outer;
}
}
// If we're here, then we found no work to do.
if (tail.parent == null and tail.dirs_busy == 0 and tail.names_busy == 0) {
cond.broadcast(); // only necessary if we don't always wake up threads when there's work to do.
break;
}
cond.wait(&lock);
}
lock.unlock();
}
// TODO: progress UI
// Scan the given dir. The active_context is assumed to have been
// initialized already and the entry for the given *dir has already been
// output.
// The given dir is closed when done.
fn scan(dir: std.fs.Dir, dir_dev: u64) void {
tail = main.allocator.create(Level) catch unreachable;
tail.* = .{ .dir = dir, .dir_dev = dir_dev, .parent = null };
readNamesDir(tail);
var threads = main.allocator.alloc(std.Thread, main.config.parallel-1) catch unreachable;
for (threads) |*t| t.* = std.Thread.spawn(.{ .stack_size = 128*1024 }, runThread, .{}) catch unreachable;
runThread();
for (threads) |*t| t.join();
tail.destroy();
tail = undefined;
}
};
pub fn scanRoot(path: []const u8, out: ?std.fs.File) !void { pub fn scanRoot(path: []const u8, out: ?std.fs.File) !void {
active_context = if (out) |f| Context.initFile(f) else Context.initMem(null); active_context = if (out) |f| Context.initFile(f) else Context.initMem(null);
@ -569,8 +787,7 @@ pub fn scan() void {
main.handleEvent(true, true); main.handleEvent(true, true);
return; return;
}; };
defer dir.close(); scanner.scan(dir, active_context.stat.dev);
scanDir(active_context, dir, active_context.stat.dev);
active_context.popPath(); active_context.popPath();
active_context.final(); active_context.final();
} }

View file

@ -42,6 +42,7 @@ pub fn quit() noreturn {
// Also, init() and other ncurses-related functions may have hidden allocation, // Also, init() and other ncurses-related functions may have hidden allocation,
// no clue if ncurses will consistently report OOM, but we're not handling that // no clue if ncurses will consistently report OOM, but we're not handling that
// right now. // right now.
// TODO: Make thread-safe.
pub fn oom() void { pub fn oom() void {
const haveui = inited; const haveui = inited;
deinit(); deinit();