From f2541d42ba411fe93018d18700ff30de764a56b5 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Fri, 12 Jul 2024 12:31:57 +0200 Subject: [PATCH] Rewrite scan/import code, experiment with multithreaded scanning (again) Benchmarks are looking very promising this time. This commit breaks a lot, though: - Hard link counting - Refreshing - JSON import - JSON export - Progress UI - OOM handling is not thread-safe All of which needs to be reimplemented and fixed again. Also haven't really tested this code very well yet so there's likely to be bugs. There's also a behavioral change: --exclude-kernfs is not checked on the given root directory anymore, meaning that the filesystem the user asked to scan is being scanned even if that's a 'kernfs'. I suspect that's more sensible behavior. The old scan.zig was quite messy and hard for me to reason about and extend, this new sink API is looking to be less confusing. I hope it stays that way as more features are added. --- src/browser.zig | 2 +- src/main.zig | 27 +- src/model.zig | 27 +- src/scan.zig | 1261 +++++++++-------------------------------------- src/sink.zig | 354 +++++++++++++ src/ui.zig | 1 + 6 files changed, 611 insertions(+), 1061 deletions(-) create mode 100644 src/sink.zig diff --git a/src/browser.zig b/src/browser.zig index be4190b..e48c916 100644 --- a/src/browser.zig +++ b/src/browser.zig @@ -848,7 +848,7 @@ pub fn keyInput(ch: i32) void { message = "Directory refresh feature disabled." else { main.state = .refresh; - scan.setupRefresh(dir_parent); + //scan.setupRefresh(dir_parent); } }, 'b' => { diff --git a/src/main.zig b/src/main.zig index 61d9ef8..a592b7d 100644 --- a/src/main.zig +++ b/src/main.zig @@ -6,6 +6,7 @@ pub const program_version = "2.4"; const std = @import("std"); const model = @import("model.zig"); const scan = @import("scan.zig"); +const sink = @import("sink.zig"); const ui = @import("ui.zig"); const browser = @import("browser.zig"); const delete = @import("delete.zig"); @@ -16,6 +17,7 @@ const c = @cImport(@cInclude("locale.h")); test "imports" { _ = model; _ = scan; + _ = sink; _ = ui; _ = browser; _ = delete; @@ -57,6 +59,7 @@ pub const config = struct { pub var exclude_caches: bool = false; pub var exclude_kernfs: bool = false; pub var exclude_patterns: std.ArrayList([:0]const u8) = std.ArrayList([:0]const u8).init(allocator); + pub var threads: usize = 1; pub var update_delay: u64 = 100*std.time.ns_per_ms; pub var scan_ui: ?enum { none, line, full } = null; @@ -456,7 +459,7 @@ pub fn main() void { } } - var scan_dir: ?[]const u8 = null; + var scan_dir: ?[:0]const u8 = null; var import_file: ?[:0]const u8 = null; var export_file: ?[:0]const u8 = null; var quit_after_scan = false; @@ -480,11 +483,14 @@ pub fn main() void { else if (opt.is("-f")) import_file = allocator.dupeZ(u8, args.arg()) catch unreachable else if (opt.is("--ignore-config")) {} else if (opt.is("--quit-after-scan")) quit_after_scan = true // undocumented feature to help with benchmarking scan/import + else if (opt.is("--experimental-threads")) config.threads = std.fmt.parseInt(u8, args.arg(), 10) catch ui.die("Invalid number of threads.\n", .{}) else if (argConfig(&args, opt)) {} else ui.die("Unrecognized option '{s}'.\n", .{opt.val}); } } + if (config.threads == 0) config.threads = std.Thread.getCpuCount() catch 1; + if (@import("builtin").os.tag != .linux and config.exclude_kernfs) ui.die("The --exclude-kernfs flag is currently only supported on Linux.\n", .{}); @@ -511,11 +517,16 @@ pub fn main() void { catch |e| ui.die("Error opening export file: {s}.\n", .{ui.errorString(e)}) ) else null; - if (import_file) |f| { - scan.importRoot(f, out_file); + if (import_file) |_| { + //scan.importRoot(f, out_file); config.imported = true; - } else scan.scanRoot(scan_dir orelse ".", out_file) - catch |e| ui.die("Error opening directory: {s}.\n", .{ui.errorString(e)}); + } else { + var buf = [_]u8{0} ** (std.fs.MAX_PATH_BYTES+1); + const path = + if (std.posix.realpathZ(scan_dir orelse ".", buf[0..buf.len-1])) |p| buf[0..p.len:0] + else |_| (scan_dir orelse "."); + scan.scan(path) catch |e| ui.die("Error opening directory: {s}.\n", .{ui.errorString(e)}); + } if (quit_after_scan or out_file != null) return; config.can_shell = config.can_shell orelse !config.imported; @@ -531,7 +542,7 @@ pub fn main() void { while (true) { switch (state) { .refresh => { - scan.scan(); + //scan.scan(); state = .browse; browser.loadDir(null); }, @@ -557,7 +568,7 @@ pub fn handleEvent(block: bool, force_draw: bool) void { if (block or force_draw or event_delay_timer.read() > config.update_delay) { if (ui.inited) _ = ui.c.erase(); switch (state) { - .scan, .refresh => scan.draw(), + .scan, .refresh => sink.draw(), .browse => browser.draw(), .delete => delete.draw(), .shell => unreachable, @@ -576,7 +587,7 @@ pub fn handleEvent(block: bool, force_draw: bool) void { if (ch == 0) return; if (ch == -1) return handleEvent(firstblock, true); switch (state) { - .scan, .refresh => scan.keyInput(ch), + .scan, .refresh => sink.keyInput(ch), .browse => browser.keyInput(ch), .delete => delete.keyInput(ch), .shell => unreachable, diff --git a/src/model.zig b/src/model.zig index 66e8400..791e439 100644 --- a/src/model.zig +++ b/src/model.zig @@ -6,15 +6,6 @@ const main = @import("main.zig"); const ui = @import("ui.zig"); const util = @import("util.zig"); -// While an arena allocator is optimimal for almost all scenarios in which ncdu -// is used, it doesn't allow for re-using deleted nodes after doing a delete or -// refresh operation, so a long-running ncdu session with regular refreshes -// will leak memory, but I'd say that's worth the efficiency gains. -// TODO: Can still implement a simple bucketed free list on top of this arena -// allocator to reuse nodes, if necessary. -var allocator_state = std.heap.ArenaAllocator.init(std.heap.page_allocator); -const allocator = allocator_state.allocator(); - pub const EType = enum(u2) { dir, link, file }; // Type for the Entry.Packed.blocks field. Smaller than a u64 to make room for flags. @@ -43,6 +34,7 @@ pub const Entry = extern struct { // Counting of Link entries is deferred until the scan/delete operation has // completed, so for those entries this flag indicates an intention to be // counted. + // TODO: Think we can remove this counted: bool = false, blocks: Blocks = 0, // 512-byte blocks }; @@ -82,7 +74,7 @@ pub const Entry = extern struct { return @ptrCast(@as([*]Ext, @ptrCast(self)) - 1); } - fn alloc(comptime T: type, etype: EType, isext: bool, ename: []const u8) *Entry { + fn alloc(comptime T: type, allocator: std.mem.Allocator, etype: EType, isext: bool, ename: []const u8) *Entry { const size = (if (isext) @as(usize, @sizeOf(Ext)) else 0) + @sizeOf(T) + ename.len + 1; var ptr = blk: while (true) { if (allocator.allocWithOptions(u8, size, 1, null)) |p| break :blk p @@ -101,11 +93,11 @@ pub const Entry = extern struct { return &e.entry; } - pub fn create(etype: EType, isext: bool, ename: []const u8) *Entry { + pub fn create(allocator: std.mem.Allocator, etype: EType, isext: bool, ename: []const u8) *Entry { return switch (etype) { - .dir => alloc(Dir, etype, isext, ename), - .file => alloc(File, etype, isext, ename), - .link => alloc(Link, etype, isext, ename), + .dir => alloc(Dir, allocator, etype, isext, ename), + .file => alloc(File, allocator, etype, isext, ename), + .link => alloc(Link, allocator, etype, isext, ename), }; } @@ -321,12 +313,15 @@ pub const Ext = extern struct { // List of st_dev entries. Those are typically 64bits, but that's quite a waste // of space when a typical scan won't cover many unique devices. pub const devices = struct { + var lock = std.Thread.Mutex{}; // id -> dev pub var list = std.ArrayList(u64).init(main.allocator); // dev -> id var lookup = std.AutoHashMap(u64, DevId).init(main.allocator); pub fn getId(dev: u64) DevId { + lock.lock(); + defer lock.unlock(); const d = lookup.getOrPut(dev) catch unreachable; if (!d.found_existing) { d.value_ptr.* = @as(DevId, @intCast(list.items.len)); @@ -462,7 +457,9 @@ pub var root: *Dir = undefined; test "entry" { - var e = Entry.create(.file, false, "hello"); + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); + defer arena.deinit(); + var e = Entry.create(arena.allocator(), .file, false, "hello"); try std.testing.expectEqual(e.pack.etype, .file); try std.testing.expect(!e.pack.isext); try std.testing.expectEqualStrings(e.name(), "hello"); diff --git a/src/scan.zig b/src/scan.zig index a7dc91a..286bd09 100644 --- a/src/scan.zig +++ b/src/scan.zig @@ -3,61 +3,23 @@ const std = @import("std"); const main = @import("main.zig"); -const model = @import("model.zig"); -const ui = @import("ui.zig"); const util = @import("util.zig"); +const model = @import("model.zig"); +const sink = @import("sink.zig"); +const ui = @import("ui.zig"); const exclude = @import("exclude.zig"); const c_statfs = @cImport(@cInclude("sys/vfs.h")); -// Concise stat struct for fields we're interested in, with the types used by the model. -const Stat = struct { - blocks: model.Blocks = 0, - size: u64 = 0, - dev: u64 = 0, - ino: u64 = 0, - nlink: u31 = 0, - hlinkc: bool = false, - dir: bool = false, - reg: bool = true, - symlink: bool = false, - ext: model.Ext = .{}, - - fn clamp(comptime T: type, comptime field: anytype, x: anytype) std.meta.fieldInfo(T, field).type { - return util.castClamp(std.meta.fieldInfo(T, field).type, x); - } - - fn truncate(comptime T: type, comptime field: anytype, x: anytype) std.meta.fieldInfo(T, field).type { - return util.castTruncate(std.meta.fieldInfo(T, field).type, x); - } - - fn read(parent: std.fs.Dir, name: [:0]const u8, follow: bool) !Stat { - const stat = try std.posix.fstatatZ(parent.fd, name, if (follow) 0 else std.posix.AT.SYMLINK_NOFOLLOW); - return Stat{ - .blocks = clamp(Stat, .blocks, stat.blocks), - .size = clamp(Stat, .size, stat.size), - .dev = truncate(Stat, .dev, stat.dev), - .ino = truncate(Stat, .ino, stat.ino), - .nlink = clamp(Stat, .nlink, stat.nlink), - .hlinkc = stat.nlink > 1 and !std.posix.S.ISDIR(stat.mode), - .dir = std.posix.S.ISDIR(stat.mode), - .reg = std.posix.S.ISREG(stat.mode), - .symlink = std.posix.S.ISLNK(stat.mode), - .ext = .{ - .mtime = clamp(model.Ext, .mtime, stat.mtime().tv_sec), - .uid = truncate(model.Ext, .uid, stat.uid), - .gid = truncate(model.Ext, .gid, stat.gid), - .mode = truncate(model.Ext, .mode, stat.mode), - }, - }; - } -}; - -var kernfs_cache: std.AutoHashMap(u64,bool) = std.AutoHashMap(u64,bool).init(main.allocator); - // This function only works on Linux fn isKernfs(dir: std.fs.Dir, dev: u64) bool { - if (kernfs_cache.get(dev)) |e| return e; + const state = struct { + var lock: std.Thread.Mutex = .{}; + var cache: std.AutoHashMap(u64,bool) = std.AutoHashMap(u64,bool).init(main.allocator); + }; + state.lock.lock(); + defer state.lock.unlock(); + if (state.cache.get(dev)) |e| return e; var buf: c_statfs.struct_statfs = undefined; if (c_statfs.fstatfs(dir.fd, &buf) != 0) return false; // silently ignoring errors isn't too nice. const iskern = switch (util.castTruncate(u32, buf.f_type)) { @@ -77,1049 +39,274 @@ fn isKernfs(dir: std.fs.Dir, dev: u64) bool { => true, else => false, }; - kernfs_cache.put(dev, iskern) catch {}; + state.cache.put(dev, iskern) catch {}; return iskern; } -// Output a JSON string. -// Could use std.json.stringify(), but that implementation is "correct" in that -// it refuses to encode non-UTF8 slices as strings. Ncdu dumps aren't valid -// JSON if we have non-UTF8 filenames, such is life... -fn writeJsonString(wr: anytype, s: []const u8) !void { - try wr.writeByte('"'); - for (s) |ch| { - switch (ch) { - '\n' => try wr.writeAll("\\n"), - '\r' => try wr.writeAll("\\r"), - 0x8 => try wr.writeAll("\\b"), - '\t' => try wr.writeAll("\\t"), - 0xC => try wr.writeAll("\\f"), - '\\' => try wr.writeAll("\\\\"), - '"' => try wr.writeAll("\\\""), - 0...7, 0xB, 0xE...0x1F, 127 => try wr.print("\\u00{x:0>2}", .{ch}), - else => try wr.writeByte(ch) - } - } - try wr.writeByte('"'); + +fn clamp(comptime T: type, comptime field: anytype, x: anytype) std.meta.fieldInfo(T, field).type { + return util.castClamp(std.meta.fieldInfo(T, field).type, x); } -// A ScanDir represents an in-memory directory listing (i.e. model.Dir) where -// entries read from disk can be merged into, without doing an O(1) lookup for -// each entry. -const ScanDir = struct { - dir: *model.Dir, - // Lookup table for name -> *entry. - // null is never stored in the table, but instead used pass a name string - // as out-of-band argument for lookups. - entries: Map, - const Map = std.HashMap(?*model.Entry, void, HashContext, 80); +fn truncate(comptime T: type, comptime field: anytype, x: anytype) std.meta.fieldInfo(T, field).type { + return util.castTruncate(std.meta.fieldInfo(T, field).type, x); +} - const HashContext = struct { - cmp: []const u8 = "", - pub fn hash(self: @This(), v: ?*model.Entry) u64 { - return std.hash.Wyhash.hash(0, if (v) |e| @as([]const u8, e.name()) else self.cmp); - } - - pub fn eql(self: @This(), ap: ?*model.Entry, bp: ?*model.Entry) bool { - if (ap == bp) return true; - const a = if (ap) |e| @as([]const u8, e.name()) else self.cmp; - const b = if (bp) |e| @as([]const u8, e.name()) else self.cmp; - return std.mem.eql(u8, a, b); - } +fn statAt(parent: std.fs.Dir, name: [:0]const u8, follow: bool) !sink.Stat { + const stat = try std.posix.fstatatZ(parent.fd, name, if (follow) 0 else std.posix.AT.SYMLINK_NOFOLLOW); + return sink.Stat{ + .blocks = clamp(sink.Stat, .blocks, stat.blocks), + .size = clamp(sink.Stat, .size, stat.size), + .dev = truncate(sink.Stat, .dev, stat.dev), + .ino = truncate(sink.Stat, .ino, stat.ino), + .nlink = clamp(sink.Stat, .nlink, stat.nlink), + .hlinkc = stat.nlink > 1 and !std.posix.S.ISDIR(stat.mode), + .dir = std.posix.S.ISDIR(stat.mode), + .reg = std.posix.S.ISREG(stat.mode), + .symlink = std.posix.S.ISLNK(stat.mode), + .ext = .{ + .mtime = clamp(model.Ext, .mtime, stat.mtime().tv_sec), + .uid = truncate(model.Ext, .uid, stat.uid), + .gid = truncate(model.Ext, .gid, stat.gid), + .mode = truncate(model.Ext, .mode, stat.mode), + }, }; +} - const Self = @This(); - fn init(dir: *model.Dir) Self { - var self = Self{ - .dir = dir, - .entries = Map.initContext(main.allocator, HashContext{}), - }; +fn isCacheDir(dir: std.fs.Dir) bool { + const sig = "Signature: 8a477f597d28d172789f06886806bc55"; + const f = dir.openFileZ("CACHEDIR.TAG", .{}) catch return false; + defer f.close(); + var buf: [sig.len]u8 = undefined; + const len = f.reader().readAll(&buf) catch return false; + return len == sig.len and std.mem.eql(u8, &buf, sig); +} - var count: Map.Size = 0; - var it = dir.sub; - while (it) |e| : (it = e.next) count += 1; - self.entries.ensureUnusedCapacity(count) catch unreachable; - it = dir.sub; - while (it) |e| : (it = e.next) - self.entries.putAssumeCapacity(e, @as(void,undefined)); - return self; +const State = struct { + // Simple LIFO queue. Threads attempt to fully scan their assigned + // directory before consulting this queue for their next task, so there + // shouldn't be too much contention here. + // TODO: unless threads keep juggling around leaf nodes, need to measure + // actual use. + // There's no real reason for this to be LIFO other than that that was the + // easiest to implement. Queue order has an effect on scheduling, but it's + // impossible for me to predict how that ends up affecting performance. + queue: [QUEUE_SIZE]*Dir = undefined, + queue_len: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), + queue_lock: std.Thread.Mutex = .{}, + queue_cond: std.Thread.Condition = .{}, + + threads: []Thread, + waiting: usize = 0, + + // No clue what this should be set to. Dir structs aren't small so we don't + // want too have too many of them. + const QUEUE_SIZE = 16; + + // Returns true if the given Dir has been queued, false if the queue is full. + fn tryPush(self: *State, d: *Dir) bool { + if (self.queue_len.load(.acquire) == QUEUE_SIZE) return false; + { + self.queue_lock.lock(); + defer self.queue_lock.unlock(); + if (self.queue_len.load(.monotonic) == QUEUE_SIZE) return false; + const slot = self.queue_len.fetchAdd(1, .monotonic); + self.queue[slot] = d; + } + self.queue_cond.signal(); + return true; } - fn addSpecial(self: *Self, name: []const u8, t: Context.Special) void { - var e = blk: { - if (self.entries.getEntryAdapted(@as(?*model.Entry,null), HashContext{ .cmp = name })) |entry| { - // XXX: If the type doesn't match, we could always do an - // in-place conversion to a File entry. That's more efficient, - // but also more code. I don't expect this to happen often. - var e = entry.key_ptr.*.?; - if (e.pack.etype == .file) { - if (e.size > 0 or e.pack.blocks > 0) { - e.delStats(self.dir); - e.size = 0; - e.pack.blocks = 0; - e.addStats(self.dir, 0); - } - e.file().?.pack = .{}; - _ = self.entries.removeAdapted(@as(?*model.Entry,null), HashContext{ .cmp = name }); - break :blk e; - } else e.delStatsRec(self.dir); + // Blocks while the queue is empty, returns null when all threads are blocking. + fn waitPop(self: *State) ?*Dir { + self.queue_lock.lock(); + defer self.queue_lock.unlock(); + + self.waiting += 1; + while (self.queue_len.load(.monotonic) == 0) { + if (self.waiting == self.threads.len) { + self.queue_cond.broadcast(); + return null; } - var e = model.Entry.create(.file, false, name); - e.next = self.dir.sub; - self.dir.sub = e; - e.addStats(self.dir, 0); - break :blk e; - }; - var f = e.file().?; - switch (t) { - .err => f.pack.err = true, - .other_fs => f.pack.other_fs = true, - .kernfs => f.pack.kernfs = true, - .excluded => f.pack.excluded = true, + self.queue_cond.wait(&self.queue_lock); } - } + self.waiting -= 1; - fn addStat(self: *Self, name: []const u8, stat: *Stat) *model.Entry { - const etype = if (stat.dir) model.EType.dir - else if (stat.hlinkc) model.EType.link - else model.EType.file; - var e = blk: { - if (self.entries.getEntryAdapted(@as(?*model.Entry,null), HashContext{ .cmp = name })) |entry| { - // XXX: In-place conversion may also be possible here. - var e = entry.key_ptr.*.?; - // changes of dev/ino affect hard link counting in a way we can't simply merge. - const samedev = if (e.dir()) |d| d.pack.dev == model.devices.getId(stat.dev) else true; - const sameino = if (e.link()) |l| l.ino == stat.ino else true; - if (e.pack.etype == etype and samedev and sameino) { - _ = self.entries.removeAdapted(@as(?*model.Entry,null), HashContext{ .cmp = name }); - break :blk e; - } else e.delStatsRec(self.dir); - } - var e = model.Entry.create(etype, main.config.extended, name); - e.next = self.dir.sub; - self.dir.sub = e; - break :blk e; - }; - // Ignore the new size/blocks field for directories, as we don't know - // what the original values were without calling delStats() on the - // entire subtree, which, in turn, would break all shared hardlink - // sizes. The current approach may result in incorrect sizes after - // refresh, but I expect the difference to be fairly minor. - if (!(e.pack.etype == .dir and e.pack.counted) and (e.pack.blocks != stat.blocks or e.size != stat.size)) { - e.delStats(self.dir); - e.pack.blocks = stat.blocks; - e.size = stat.size; - } - if (e.dir()) |d| { - d.parent = self.dir; - d.pack.dev = model.devices.getId(stat.dev); - } - if (e.file()) |f| f.pack = .{ .notreg = !stat.dir and !stat.reg }; - if (e.link()) |l| l.ino = stat.ino; - if (e.ext()) |ext| { - if (ext.mtime > stat.ext.mtime) - stat.ext.mtime = ext.mtime; - ext.* = stat.ext; - } - - e.addStats(self.dir, stat.nlink); - return e; - } - - fn final(self: *Self) void { - if (self.entries.count() > 0) { - var it = &self.dir.sub; - while (it.*) |e| { - if (self.entries.getKey(e) == e) { - e.delStatsRec(self.dir); - it.* = e.next; - } else - it = &e.next; - } - } - self.dir.updateSubErr(); - } - - fn deinit(self: *Self) void { - self.entries.deinit(); + const slot = self.queue_len.fetchSub(1, .monotonic) - 1; + defer self.queue[slot] = undefined; + return self.queue[slot]; } }; -// Scan/import context. Entries are added in roughly the following way: -// -// ctx.pushPath(name) -// ctx.stat = ..; -// ctx.addSpecial() or ctx.addStat() -// if (ctx.stat.dir) { -// // repeat top-level steps for files in dir, recursively. -// } -// ctx.popPath(); -// -const Context = struct { - // When scanning to RAM - parents: ?std.ArrayList(ScanDir) = null, - refreshing: ?*model.Dir = null, - // When scanning to a file - wr: ?*Writer = null, - path: std.ArrayList(u8) = std.ArrayList(u8).init(main.allocator), - path_indices: std.ArrayList(usize) = std.ArrayList(usize).init(main.allocator), - items_seen: u32 = 0, +const Dir = struct { + fd: std.fs.Dir, + dev: u64, + pat: exclude.Patterns, + it: std.fs.Dir.Iterator, + sink: *sink.Dir, - // 0-terminated name of the top entry, points into 'path', invalid after popPath(). - // This is a workaround to Zig's directory iterator not returning a [:0]const u8. - name: [:0]const u8 = undefined, - - last_error: ?[:0]u8 = null, - fatal_error: ?anyerror = null, - - stat: Stat = undefined, - - const Writer = std.io.BufferedWriter(4096, std.fs.File.Writer); - const Self = @This(); - - fn writeErr(e: anyerror) noreturn { - ui.die("Error writing to file: {s}.\n", .{ ui.errorString(e) }); - } - - fn initFile(out: std.fs.File) *Self { - var buf = main.allocator.create(Writer) catch unreachable; - errdefer main.allocator.destroy(buf); - buf.* = std.io.bufferedWriter(out.writer()); - var wr = buf.writer(); - wr.writeAll("[1,2,{\"progname\":\"ncdu\",\"progver\":\"" ++ main.program_version ++ "\",\"timestamp\":") catch |e| writeErr(e); - wr.print("{d}", .{std.time.timestamp()}) catch |e| writeErr(e); - wr.writeByte('}') catch |e| writeErr(e); - - const self = main.allocator.create(Self) catch unreachable; - self.* = .{ .wr = buf }; - return self; - } - - fn initMem(dir: ?*model.Dir) *Self { - var self = main.allocator.create(Self) catch unreachable; - self.* = .{ - .parents = std.ArrayList(ScanDir).init(main.allocator), - .refreshing = dir, + fn create(fd: std.fs.Dir, dev: u64, pat: exclude.Patterns, s: *sink.Dir) *Dir { + const d = main.allocator.create(Dir) catch unreachable; + d.* = .{ + .fd = fd, + .dev = dev, + .pat = pat, + .sink = s, + .it = fd.iterate(), }; - if (dir) |d| self.parents.?.append(ScanDir.init(d)) catch unreachable; - return self; + return d; } - fn final(self: *Self) void { - if (self.parents) |_| { - counting_hardlinks = true; - defer counting_hardlinks = false; - main.handleEvent(false, true); - model.inodes.addAllStats(); - var p = self.refreshing; - while (p) |d| : (p = d.parent) d.updateSubErr(); - } - if (self.wr) |wr| { - wr.writer().writeByte(']') catch |e| writeErr(e); - wr.flush() catch |e| writeErr(e); - } - } - - // Add the name of the file/dir entry we're currently inspecting - fn pushPath(self: *Self, name: []const u8) void { - self.path_indices.append(self.path.items.len) catch unreachable; - if (self.path.items.len > 1) self.path.append('/') catch unreachable; - const start = self.path.items.len; - self.path.appendSlice(name) catch unreachable; - - self.path.append(0) catch unreachable; - self.name = self.path.items[start..self.path.items.len-1:0]; - self.path.items.len -= 1; - } - - fn popPath(self: *Self) void { - self.path.items.len = self.path_indices.pop(); - - if (self.stat.dir) { - if (self.parents) |*p| { - if (p.items.len > 0) { - var d = p.pop(); - d.final(); - d.deinit(); - } - } - if (self.wr) |w| w.writer().writeByte(']') catch |e| writeErr(e); - } else - self.stat.dir = true; // repeated popPath()s mean we're closing parent dirs. - } - - fn pathZ(self: *Self) [:0]const u8 { - return util.arrayListBufZ(&self.path); - } - - // Set a flag to indicate that there was an error listing file entries in the current directory. - // (Such errors are silently ignored when exporting to a file, as the directory metadata has already been written) - fn setDirlistError(self: *Self) void { - if (self.parents) |*p| p.items[p.items.len-1].dir.pack.err = true; - } - - const Special = enum { err, other_fs, kernfs, excluded }; - - fn writeSpecial(self: *Self, w: anytype, t: Special) !void { - try w.writeAll(",\n"); - if (self.stat.dir) try w.writeByte('['); - try w.writeAll("{\"name\":"); - try writeJsonString(w, self.name); - switch (t) { - .err => try w.writeAll(",\"read_error\":true"), - .other_fs => try w.writeAll(",\"excluded\":\"othfs\""), - .kernfs => try w.writeAll(",\"excluded\":\"kernfs\""), - .excluded => try w.writeAll(",\"excluded\":\"pattern\""), - } - try w.writeByte('}'); - if (self.stat.dir) try w.writeByte(']'); - } - - // Insert the current path as a special entry (i.e. a file/dir that is not counted) - // Ignores self.stat except for the 'dir' option. - fn addSpecial(self: *Self, t: Special) void { - if (t == .err) { - if (self.last_error) |p| main.allocator.free(p); - self.last_error = main.allocator.dupeZ(u8, self.path.items) catch unreachable; - } - - if (self.parents) |*p| - p.items[p.items.len-1].addSpecial(self.name, t) - else if (self.wr) |wr| - self.writeSpecial(wr.writer(), t) catch |e| writeErr(e); - - self.stat.dir = false; // So that popPath() doesn't consider this as leaving a dir. - self.items_seen += 1; - } - - fn writeStat(self: *Self, w: anytype, dir_dev: u64) !void { - try w.writeAll(",\n"); - if (self.stat.dir) try w.writeByte('['); - try w.writeAll("{\"name\":"); - try writeJsonString(w, self.name); - if (self.stat.size > 0) try w.print(",\"asize\":{d}", .{ self.stat.size }); - if (self.stat.blocks > 0) try w.print(",\"dsize\":{d}", .{ util.blocksToSize(self.stat.blocks) }); - if (self.stat.dir and self.stat.dev != dir_dev) try w.print(",\"dev\":{d}", .{ self.stat.dev }); - if (self.stat.hlinkc) try w.print(",\"ino\":{d},\"hlnkc\":true,\"nlink\":{d}", .{ self.stat.ino, self.stat.nlink }); - if (!self.stat.dir and !self.stat.reg) try w.writeAll(",\"notreg\":true"); - if (main.config.extended) - try w.print(",\"uid\":{d},\"gid\":{d},\"mode\":{d},\"mtime\":{d}", - .{ self.stat.ext.uid, self.stat.ext.gid, self.stat.ext.mode, self.stat.ext.mtime }); - try w.writeByte('}'); - } - - // Insert current path as a counted file/dir/hardlink, with information from self.stat - fn addStat(self: *Self, dir_dev: u64) void { - if (self.parents) |*p| { - var e = if (p.items.len == 0) blk: { - // Root entry - var e = model.Entry.create(.dir, main.config.extended, self.name); - e.pack.blocks = self.stat.blocks; - e.size = self.stat.size; - if (e.ext()) |ext| ext.* = self.stat.ext; - model.root = e.dir().?; - model.root.pack.dev = model.devices.getId(self.stat.dev); - break :blk e; - } else - p.items[p.items.len-1].addStat(self.name, &self.stat); - - if (e.dir()) |d| // Enter the directory - p.append(ScanDir.init(d)) catch unreachable; - - } else if (self.wr) |wr| - self.writeStat(wr.writer(), dir_dev) catch |e| writeErr(e); - - self.items_seen += 1; - } - - fn deinit(self: *Self) void { - if (self.last_error) |p| main.allocator.free(p); - if (self.parents) |*p| { - for (p.items) |*i| i.deinit(); - p.deinit(); - } - if (self.wr) |p| main.allocator.destroy(p); - self.path.deinit(); - self.path_indices.deinit(); - main.allocator.destroy(self); + fn destroy(d: *Dir) void { + d.pat.deinit(); + d.fd.close(); + d.sink.unref(); + main.allocator.destroy(d); } }; -// Context that is currently being used for scanning. -var active_context: *Context = undefined; +const Thread = struct { + thread_num: usize, + sink: *sink.Thread, + state: *State, + stack: std.ArrayList(*Dir) = std.ArrayList(*Dir).init(main.allocator), + thread: std.Thread = undefined, + namebuf: [4096]u8 = undefined, -// Read and index entries of the given dir. -fn scanDir(ctx: *Context, pat: *const exclude.Patterns, dir: std.fs.Dir, dir_dev: u64) void { - var it = main.allocator.create(std.fs.Dir.Iterator) catch unreachable; - defer main.allocator.destroy(it); - it.* = dir.iterate(); - while(true) { - const entry = it.next() catch { - ctx.setDirlistError(); + fn scanOne(t: *Thread, dir: *Dir, name_: []const u8) void { + if (name_.len > t.namebuf.len - 1) { + dir.sink.addSpecial(t.sink, name_, .err); return; - } orelse break; - - ctx.stat.dir = false; - ctx.pushPath(entry.name); - defer ctx.popPath(); - main.handleEvent(false, false); - - const excluded = pat.match(ctx.name); - if (excluded == false) { // matched either a file or directory, so we can exclude this before stat()ing. - ctx.addSpecial(.excluded); - continue; } - ctx.stat = Stat.read(dir, ctx.name, false) catch { - ctx.addSpecial(.err); - continue; + @memcpy(t.namebuf[0..name_.len], name_); + t.namebuf[name_.len] = 0; + const name = t.namebuf[0..name_.len:0]; + + const excluded = dir.pat.match(name); + if (excluded == false) { // matched either a file or directory, so we can exclude this before stat()ing. + dir.sink.addSpecial(t.sink, name, .excluded); + return; + } + + var stat = statAt(dir.fd, name, false) catch { + dir.sink.addSpecial(t.sink, name, .err); + return; }; - if (main.config.follow_symlinks and ctx.stat.symlink) { - if (Stat.read(dir, ctx.name, true)) |nstat| { + if (main.config.follow_symlinks and stat.symlink) { + if (statAt(dir.fd, name, true)) |nstat| { if (!nstat.dir) { - ctx.stat = nstat; + stat = nstat; // Symlink targets may reside on different filesystems, // this will break hardlink detection and counting so let's disable it. - if (ctx.stat.hlinkc and ctx.stat.dev != dir_dev) - ctx.stat.hlinkc = false; + if (stat.hlinkc and stat.dev != dir.dev) { + stat.hlinkc = false; + stat.nlink = 1; + } } } else |_| {} } - if (main.config.same_fs and ctx.stat.dev != dir_dev) { - ctx.addSpecial(.other_fs); - continue; + + if (main.config.same_fs and stat.dev != dir.dev) { + dir.sink.addSpecial(t.sink, name, .other_fs); + return; } - if (excluded) |e| if (e and ctx.stat.dir) { - ctx.addSpecial(.excluded); - continue; + + if (!stat.dir) { + dir.sink.addStat(t.sink, name, &stat); + return; + } + + if (excluded == true) { + dir.sink.addSpecial(t.sink, name, .excluded); + return; + } + + var edir = dir.fd.openDirZ(name, .{ .no_follow = true, .iterate = true }) catch { + dir.sink.addSpecial(t.sink, name, .err); + return; }; - var edir = - if (!ctx.stat.dir) null - else dir.openDirZ(ctx.name, .{ .no_follow = true, .iterate = true }) catch { - ctx.addSpecial(.err); - continue; - }; - defer if (edir != null) edir.?.close(); - - if (@import("builtin").os.tag == .linux and main.config.exclude_kernfs and ctx.stat.dir and isKernfs(edir.?, ctx.stat.dev)) { - ctx.addSpecial(.kernfs); - continue; + if (@import("builtin").os.tag == .linux + and main.config.exclude_kernfs + and stat.dev != dir.dev + and isKernfs(edir, stat.dev) + ) { + edir.close(); + dir.sink.addSpecial(t.sink, name, .kernfs); + return; } - if (main.config.exclude_caches and ctx.stat.dir) { - if (edir.?.openFileZ("CACHEDIR.TAG", .{})) |f| { - defer f.close(); - const sig = "Signature: 8a477f597d28d172789f06886806bc55"; - var buf: [sig.len]u8 = undefined; - if (f.reader().readAll(&buf)) |len| { - if (len == sig.len and std.mem.eql(u8, &buf, sig)) { - ctx.addSpecial(.excluded); - continue; - } - } else |_| {} - } else |_| {} + if (main.config.exclude_caches and isCacheDir(edir)) { + dir.sink.addSpecial(t.sink, name, .excluded); + edir.close(); + return; } - ctx.addStat(dir_dev); - if (ctx.stat.dir) { - var subpat = pat.enter(ctx.name); - defer subpat.deinit(); - scanDir(ctx, &subpat, edir.?, ctx.stat.dev); - } - } -} - -pub fn scanRoot(path: []const u8, out: ?std.fs.File) !void { - active_context = if (out) |f| Context.initFile(f) else Context.initMem(null); - - const full_path = std.fs.realpathAlloc(main.allocator, path) catch null; - defer if (full_path) |p| main.allocator.free(p); - active_context.pushPath(full_path orelse path); - - active_context.stat = try Stat.read(std.fs.cwd(), active_context.pathZ(), true); - if (!active_context.stat.dir) return error.NotDir; - active_context.addStat(0); - scan(); -} - -pub fn setupRefresh(parent: *model.Dir) void { - active_context = Context.initMem(parent); - var full_path = std.ArrayList(u8).init(main.allocator); - defer full_path.deinit(); - parent.fmtPath(true, &full_path); - active_context.pushPath(full_path.items); - active_context.stat.dir = true; - active_context.stat.dev = model.devices.list.items[parent.pack.dev]; -} - -// To be called after setupRefresh() (or from scanRoot()) -pub fn scan() void { - defer active_context.deinit(); - var dir = std.fs.cwd().openDirZ(active_context.pathZ(), .{ .iterate = true }) catch |e| { - active_context.last_error = main.allocator.dupeZ(u8, active_context.path.items) catch unreachable; - active_context.fatal_error = e; - while (main.state == .refresh or main.state == .scan) - main.handleEvent(true, true); - return; - }; - defer dir.close(); - var pat = exclude.getPatterns(active_context.pathZ()); - defer pat.deinit(); - scanDir(active_context, &pat, dir, active_context.stat.dev); - active_context.popPath(); - active_context.final(); -} - -// Using a custom recursive descent JSON parser here. std.json is great, but -// has two major downsides: -// - It does strict UTF-8 validation. Which is great in general, but not so -// much for ncdu dumps that may contain non-UTF-8 paths encoded as strings. -// - The streaming parser requires complex and overly large buffering in order -// to read strings, which doesn't work so well in our case. -// -// TODO: This code isn't very elegant and is likely contains bugs. It may be -// worth factoring out the JSON parts into a separate abstraction for which -// tests can be written. -const Import = struct { - ctx: *Context, - - rd: std.fs.File, - rdoff: usize = 0, - rdsize: usize = 0, - rdbuf: [8*1024]u8 = undefined, - - ch: u8 = 0, // last read character, 0 = EOF (or invalid null byte, who cares) - byte: u64 = 1, - line: u64 = 1, - namebuf: [32*1024]u8 = undefined, - - const Self = @This(); - - fn die(self: *Self, str: []const u8) noreturn { - ui.die("Error importing file on line {}:{}: {s}.\n", .{ self.line, self.byte, str }); + const s = dir.sink.addDir(t.sink, name, &stat); + const ndir = Dir.create(edir, stat.dev, dir.pat.enter(name), s); + if (main.config.threads == 1 or !t.state.tryPush(ndir)) + t.stack.append(ndir) catch unreachable; } - // Advance to the next byte, sets ch. - fn con(self: *Self) void { - if (self.rdoff >= self.rdsize) { - self.rdoff = 0; - self.rdsize = self.rd.read(&self.rdbuf) catch |e| switch (e) { - error.InputOutput => self.die("I/O error"), - error.IsDir => self.die("not a file"), // should be detected at open() time, but no flag for that... - error.SystemResources => self.die("out of memory"), - else => unreachable, - }; - if (self.rdsize == 0) { - self.ch = 0; - return; + fn run(t: *Thread) void { + defer t.stack.deinit(); + while (t.state.waitPop()) |dir| { + t.stack.append(dir) catch unreachable; + + while (t.stack.items.len > 0) { + const d = t.stack.items[t.stack.items.len - 1]; + + t.sink.setDir(d.sink); + if (t.thread_num == 0) main.handleEvent(false, false); + + const entry = d.it.next() catch blk: { + dir.sink.setReadError(t.sink); + break :blk null; + }; + if (entry) |e| t.scanOne(d, e.name) + else { + t.sink.setDir(null); + t.stack.pop().destroy(); + } } } - // Zig 0.10 copies the entire array to the stack in ReleaseSafe mode, - // work around that bug by indexing into a pointer to the array - // instead. - self.ch = (&self.rdbuf)[self.rdoff]; - self.rdoff += 1; - self.byte += 1; - } - - // Advance to the next non-whitespace byte. - fn conws(self: *Self) void { - while (true) { - switch (self.ch) { - '\n' => { - self.line += 1; - self.byte = 1; - }, - ' ', '\t', '\r' => {}, - else => break, - } - self.con(); - } - } - - // Returns the current byte and advances to the next. - fn next(self: *Self) u8 { - defer self.con(); - return self.ch; - } - - fn hexdig(self: *Self) u16 { - return switch (self.ch) { - '0'...'9' => self.next() - '0', - 'a'...'f' => self.next() - 'a' + 10, - 'A'...'F' => self.next() - 'A' + 10, - else => self.die("invalid hex digit"), - }; - } - - // Read a string into buf. - // Any characters beyond the size of the buffer are consumed but otherwise discarded. - // (May store fewer characters in the case of \u escapes, it's not super precise) - fn string(self: *Self, buf: []u8) []u8 { - if (self.next() != '"') self.die("expected '\"'"); - var n: usize = 0; - while (true) { - const ch = self.next(); - switch (ch) { - '"' => break, - '\\' => switch (self.next()) { - '"' => if (n < buf.len) { buf[n] = '"'; n += 1; }, - '\\'=> if (n < buf.len) { buf[n] = '\\';n += 1; }, - '/' => if (n < buf.len) { buf[n] = '/'; n += 1; }, - 'b' => if (n < buf.len) { buf[n] = 0x8; n += 1; }, - 'f' => if (n < buf.len) { buf[n] = 0xc; n += 1; }, - 'n' => if (n < buf.len) { buf[n] = 0xa; n += 1; }, - 'r' => if (n < buf.len) { buf[n] = 0xd; n += 1; }, - 't' => if (n < buf.len) { buf[n] = 0x9; n += 1; }, - 'u' => { - const char = (self.hexdig()<<12) + (self.hexdig()<<8) + (self.hexdig()<<4) + self.hexdig(); - if (n + 6 < buf.len) - n += std.unicode.utf8Encode(char, buf[n..n+5]) catch unreachable; - }, - else => self.die("invalid escape sequence"), - }, - 0x20, 0x21, 0x23...0x5b, 0x5d...0xff => if (n < buf.len) { buf[n] = ch; n += 1; }, - else => self.die("invalid character in string"), - } - } - return buf[0..n]; - } - - fn uint(self: *Self, T: anytype) T { - if (self.ch == '0') { - self.con(); - return 0; - } - var v: T = 0; - while (self.ch >= '0' and self.ch <= '9') { - const newv = v *% 10 +% (self.ch - '0'); - if (newv < v) self.die("integer out of range"); - v = newv; - self.con(); - } - if (v == 0) self.die("expected number"); - return v; - } - - fn boolean(self: *Self) bool { - switch (self.next()) { - 't' => { - if (self.next() == 'r' and self.next() == 'u' and self.next() == 'e') - return true; - }, - 'f' => { - if (self.next() == 'a' and self.next() == 'l' and self.next() == 's' and self.next() == 'e') - return false; - }, - else => {} - } - self.die("expected boolean"); - } - - // Consume and discard any JSON value. - fn conval(self: *Self) void { - switch (self.ch) { - 't' => _ = self.boolean(), - 'f' => _ = self.boolean(), - 'n' => { - self.con(); - if (!(self.next() == 'u' and self.next() == 'l' and self.next() == 'l')) - self.die("invalid JSON value"); - }, - '"' => _ = self.string(&[0]u8{}), - '{' => { - self.con(); - self.conws(); - if (self.ch == '}') { self.con(); return; } - while (true) { - self.conws(); - _ = self.string(&[0]u8{}); - self.conws(); - if (self.next() != ':') self.die("expected ':'"); - self.conws(); - self.conval(); - self.conws(); - switch (self.next()) { - ',' => continue, - '}' => break, - else => self.die("expected ',' or '}'"), - } - } - }, - '[' => { - self.con(); - self.conws(); - if (self.ch == ']') { self.con(); return; } - while (true) { - self.conws(); - self.conval(); - self.conws(); - switch (self.next()) { - ',' => continue, - ']' => break, - else => self.die("expected ',' or ']'"), - } - } - }, - '-', '0'...'9' => { - self.con(); - // Numbers are kind of annoying, this "parsing" is invalid and ultra-lazy. - while (true) { - switch (self.ch) { - '-', '+', 'e', 'E', '.', '0'...'9' => self.con(), - else => return, - } - } - }, - else => self.die("invalid JSON value"), - } - } - - fn itemkey(self: *Self, key: []const u8, name: *?[]u8, special: *?Context.Special) void { - const eq = std.mem.eql; - switch (if (key.len > 0) key[0] else @as(u8,0)) { - 'a' => { - if (eq(u8, key, "asize")) { - self.ctx.stat.size = self.uint(u64); - return; - } - }, - 'd' => { - if (eq(u8, key, "dsize")) { - self.ctx.stat.blocks = @intCast(self.uint(u64)>>9); - return; - } - if (eq(u8, key, "dev")) { - self.ctx.stat.dev = self.uint(u64); - return; - } - }, - 'e' => { - if (eq(u8, key, "excluded")) { - var buf: [32]u8 = undefined; - const typ = self.string(&buf); - // "frmlnk" is also possible, but currently considered equivalent to "pattern". - if (eq(u8, typ, "otherfs")) special.* = .other_fs - else if (eq(u8, typ, "kernfs")) special.* = .kernfs - else special.* = .excluded; - return; - } - }, - 'g' => { - if (eq(u8, key, "gid")) { - self.ctx.stat.ext.gid = self.uint(u32); - return; - } - }, - 'h' => { - if (eq(u8, key, "hlnkc")) { - self.ctx.stat.hlinkc = self.boolean(); - return; - } - }, - 'i' => { - if (eq(u8, key, "ino")) { - self.ctx.stat.ino = self.uint(u64); - return; - } - }, - 'm' => { - if (eq(u8, key, "mode")) { - self.ctx.stat.ext.mode = self.uint(u16); - return; - } - if (eq(u8, key, "mtime")) { - self.ctx.stat.ext.mtime = self.uint(u64); - // Accept decimal numbers, but discard the fractional part because our data model doesn't support it. - if (self.ch == '.') { - self.con(); - while (self.ch >= '0' and self.ch <= '9') - self.con(); - } - return; - } - }, - 'n' => { - if (eq(u8, key, "name")) { - if (name.* != null) self.die("duplicate key"); - name.* = self.string(&self.namebuf); - if (name.*.?.len > self.namebuf.len-5) self.die("too long file name"); - return; - } - if (eq(u8, key, "nlink")) { - self.ctx.stat.nlink = self.uint(u31); - if (!self.ctx.stat.dir and self.ctx.stat.nlink > 1) - self.ctx.stat.hlinkc = true; - return; - } - if (eq(u8, key, "notreg")) { - self.ctx.stat.reg = !self.boolean(); - return; - } - }, - 'r' => { - if (eq(u8, key, "read_error")) { - if (self.boolean()) - special.* = .err; - return; - } - }, - 'u' => { - if (eq(u8, key, "uid")) { - self.ctx.stat.ext.uid = self.uint(u32); - return; - } - }, - else => {}, - } - self.conval(); - } - - fn iteminfo(self: *Self, dir_dev: u64) void { - if (self.next() != '{') self.die("expected '{'"); - self.ctx.stat.dev = dir_dev; - var name: ?[]u8 = null; - var special: ?Context.Special = null; - while (true) { - self.conws(); - var keybuf: [32]u8 = undefined; - const key = self.string(&keybuf); - self.conws(); - if (self.next() != ':') self.die("expected ':'"); - self.conws(); - self.itemkey(key, &name, &special); - self.conws(); - switch (self.next()) { - ',' => continue, - '}' => break, - else => self.die("expected ',' or '}'"), - } - } - if (name) |n| self.ctx.pushPath(n) - else self.die("missing \"name\" field"); - if (special) |s| self.ctx.addSpecial(s) - else self.ctx.addStat(dir_dev); - } - - fn item(self: *Self, dev: u64) void { - self.ctx.stat = .{}; - var isdir = false; - if (self.ch == '[') { - isdir = true; - self.ctx.stat.dir = true; - self.con(); - self.conws(); - } - - self.iteminfo(dev); - - self.conws(); - if (isdir) { - const ndev = self.ctx.stat.dev; - while (self.ch == ',') { - self.con(); - self.conws(); - self.item(ndev); - self.conws(); - } - if (self.next() != ']') self.die("expected ',' or ']'"); - } - self.ctx.popPath(); - - if ((self.ctx.items_seen & 1023) == 0) - main.handleEvent(false, false); - } - - fn root(self: *Self) void { - self.con(); - self.conws(); - if (self.next() != '[') self.die("expected '['"); - self.conws(); - if (self.uint(u16) != 1) self.die("incompatible major format version"); - self.conws(); - if (self.next() != ',') self.die("expected ','"); - self.conws(); - _ = self.uint(u16); // minor version, ignored for now - self.conws(); - if (self.next() != ',') self.die("expected ','"); - self.conws(); - // metadata object - if (self.ch != '{') self.die("expected '{'"); - self.conval(); // completely discarded - self.conws(); - if (self.next() != ',') self.die("expected ','"); - self.conws(); - // root element - if (self.ch != '[') self.die("expected '['"); // top-level entry must be a dir - self.item(0); - self.conws(); - // any trailing elements - while (self.ch == ',') { - self.con(); - self.conws(); - self.conval(); - self.conws(); - } - if (self.next() != ']') self.die("expected ',' or ']'"); - self.conws(); - if (self.ch != 0) self.die("trailing garbage"); } }; -pub fn importRoot(path: [:0]const u8, out: ?std.fs.File) void { - const fd = if (std.mem.eql(u8, "-", path)) std.io.getStdIn() - else std.fs.cwd().openFileZ(path, .{}) - catch |e| ui.die("Error reading file: {s}.\n", .{ui.errorString(e)}); - defer fd.close(); - active_context = if (out) |f| Context.initFile(f) else Context.initMem(null); - var imp = Import{ .ctx = active_context, .rd = fd }; - defer imp.ctx.deinit(); - imp.root(); - imp.ctx.final(); -} - -var animation_pos: u32 = 0; -var counting_hardlinks: bool = false; -var need_confirm_quit = false; - -fn drawError(err: anyerror) void { - const width = ui.cols -| 5; - const box = ui.Box.create(7, width, "Scan error"); - - box.move(2, 2); - ui.addstr("Path: "); - ui.addstr(ui.shorten(ui.toUtf8(active_context.last_error.?), width -| 10)); - - box.move(3, 2); - ui.addstr("Error: "); - ui.addstr(ui.shorten(ui.errorString(err), width -| 6)); - - box.move(5, width -| 27); - ui.addstr("Press any key to continue"); -} - -fn drawCounting() void { - const box = ui.Box.create(4, 25, "Finalizing"); - box.move(2, 2); - ui.addstr("Counting hardlinks..."); -} - -fn drawBox() void { - ui.init(); - const ctx = active_context; - if (ctx.fatal_error) |err| return drawError(err); - if (counting_hardlinks) return drawCounting(); - const width = ui.cols -| 5; - const box = ui.Box.create(10, width, "Scanning..."); - box.move(2, 2); - ui.addstr("Total items: "); - ui.addnum(.default, ctx.items_seen); - - if (width > 48 and ctx.parents != null) { - box.move(2, 30); - ui.addstr("size: "); - // TODO: Should display the size of the dir-to-be-refreshed on refreshing, not the root. - ui.addsize(.default, util.blocksToSize(model.root.entry.pack.blocks +| model.inodes.total_blocks)); - } - - box.move(3, 2); - ui.addstr("Current item: "); - ui.addstr(ui.shorten(ui.toUtf8(ctx.pathZ()), width -| 18)); - - if (ctx.last_error) |path| { - box.move(5, 2); - ui.style(.bold); - ui.addstr("Warning: "); - ui.style(.default); - ui.addstr("error scanning "); - ui.addstr(ui.shorten(ui.toUtf8(path), width -| 28)); - box.move(6, 3); - ui.addstr("some directory sizes may not be correct."); - } - - if (need_confirm_quit) { - box.move(8, width -| 20); - ui.addstr("Press "); - ui.style(.key); - ui.addch('y'); - ui.style(.default); - ui.addstr(" to confirm"); - } else { - box.move(8, width -| 18); - ui.addstr("Press "); - ui.style(.key); - ui.addch('q'); - ui.style(.default); - ui.addstr(" to abort"); - } - - if (main.config.update_delay < std.time.ns_per_s and width > 40) { - const txt = "Scanning..."; - animation_pos += 1; - if (animation_pos >= txt.len*2) animation_pos = 0; - if (animation_pos < txt.len) { - box.move(8, 2); - for (txt[0..animation_pos + 1]) |t| ui.addch(t); - } else { - var i: u32 = txt.len-1; - while (i > animation_pos-txt.len) : (i -= 1) { - box.move(8, 2+i); - ui.addch(txt[i]); - } - } - } -} - -pub fn draw() void { - if (active_context.fatal_error != null and main.config.scan_ui.? != .full) - ui.die("Error reading {s}: {s}\n", .{ active_context.last_error.?, ui.errorString(active_context.fatal_error.?) }); - switch (main.config.scan_ui.?) { - .none => {}, - .line => { - var buf: [256]u8 = undefined; - var line: []const u8 = undefined; - if (counting_hardlinks) { - line = "\x1b7\x1b[JCounting hardlinks...\x1b8"; - } else if (active_context.parents == null) { - line = std.fmt.bufPrint(&buf, "\x1b7\x1b[J{s: <63} {d:>9} files\x1b8", - .{ ui.shorten(ui.toUtf8(active_context.pathZ()), 63), active_context.items_seen } - ) catch return; - } else { - const r = ui.FmtSize.fmt(util.blocksToSize(model.root.entry.pack.blocks)); - line = std.fmt.bufPrint(&buf, "\x1b7\x1b[J{s: <51} {d:>9} files / {s}{s}\x1b8", - .{ ui.shorten(ui.toUtf8(active_context.pathZ()), 51), active_context.items_seen, r.num(), r.unit } - ) catch return; - } - const stderr = std.io.getStdErr(); - stderr.writeAll(line) catch {}; - }, - .full => drawBox(), - } -} - -pub fn keyInput(ch: i32) void { - if (active_context.fatal_error != null) { - if (main.state == .scan) ui.quit() - else main.state = .browse; - return; - } - if (need_confirm_quit) { - switch (ch) { - 'y', 'Y' => if (need_confirm_quit) ui.quit(), - else => need_confirm_quit = false, - } - return; - } - switch (ch) { - 'q' => if (main.config.confirm_quit) { need_confirm_quit = true; } else ui.quit(), - else => need_confirm_quit = false, - } +pub fn scan(path: [:0]const u8) !void { + const stat = try statAt(std.fs.cwd(), path, true); + const fd = try std.fs.cwd().openDirZ(path, .{ .iterate = true }); + + sink.state.threads = main.allocator.alloc(sink.Thread, main.config.threads) catch unreachable; + for (sink.state.threads) |*t| t.* = .{}; + defer main.allocator.free(sink.state.threads); + + var state = State{ + .threads = main.allocator.alloc(Thread, main.config.threads) catch unreachable, + }; + defer main.allocator.free(state.threads); + + const root = sink.createRoot(path, &stat); + const dir = Dir.create(fd, stat.dev, exclude.getPatterns(path), root); + _ = state.tryPush(dir); + + for (sink.state.threads, state.threads, 0..) |*s, *t, n| + t.* = .{ .sink = s, .state = &state, .thread_num = n }; + + // XXX: Continue with fewer threads on error? + for (state.threads[1..]) |*t| { + t.thread = std.Thread.spawn( + .{ .stack_size = 128 * 1024, .allocator = main.allocator }, Thread.run, .{t} + ) catch |e| ui.die("Error spawning thread: {}\n", .{e}); + } + state.threads[0].run(); + for (state.threads[1..]) |*t| t.thread.join(); } diff --git a/src/sink.zig b/src/sink.zig new file mode 100644 index 0000000..bae419f --- /dev/null +++ b/src/sink.zig @@ -0,0 +1,354 @@ +// SPDX-FileCopyrightText: Yorhel +// SPDX-License-Identifier: MIT + +const std = @import("std"); +const main = @import("main.zig"); +const model = @import("model.zig"); +const ui = @import("ui.zig"); +const util = @import("util.zig"); + +// "sink" in this case is where the scan/import results (from scan.zig and +// json_import.zig) are being forwarded to and processed. This code handles +// aggregating the tree structure into memory or exporting it as JSON. Also +// handles progress display. + +// API for sources: +// +// Single-threaded: +// +// dir = createRoot(name, stat) +// dir.addSpecial(name, opt) +// dir.addFile(name, stat) +// sub = dir.addDir(name, stat) +// (no dir.stuff here) +// sub.addstuff(); +// sub.unref(); +// dir.unref(); +// +// Multi-threaded interleaving: +// +// dir = createRoot(name, stat) +// dir.addSpecial(name, opt) +// dir.addFile(name, stat) +// sub = dir.addDir(...) +// sub.addstuff(); +// sub2 = dir.addDir(..); +// sub.unref(); +// dir.unref(); // <- no more direct descendants for x, but subdirs could still be active +// sub2.addStuff(); +// sub2.unref(); // <- this is where 'dir' is really done. +// +// Rule: +// No concurrent method calls on a single Dir object, but objects may be passed between threads. + + +// Concise stat struct for fields we're interested in, with the types used by the model. +pub const Stat = struct { + blocks: model.Blocks = 0, + size: u64 = 0, + dev: u64 = 0, + ino: u64 = 0, + nlink: u31 = 0, + hlinkc: bool = false, + dir: bool = false, + reg: bool = true, + symlink: bool = false, + ext: model.Ext = .{}, +}; + +pub const Special = enum { err, other_fs, kernfs, excluded }; + + +const MemDir = struct { + dir: *model.Dir, + entries: Map, + + own_blocks: model.Blocks, + own_bytes: u64, + + // Additional counts collected from subdirectories. Subdirs may run final() + // from separate threads so these need to be protected. + blocks: model.Blocks = 0, + bytes: u64 = 0, + items: u32 = 0, + mtime: u64 = 0, + suberr: bool = false, + lock: std.Thread.Mutex = .{}, + + const Map = std.HashMap(*model.Entry, void, HashContext, 80); + + const HashContext = struct { + pub fn hash(_: @This(), e: *model.Entry) u64 { + return std.hash.Wyhash.hash(0, e.name()); + } + pub fn eql(_: @This(), a: *model.Entry, b: *model.Entry) bool { + return a == b or std.mem.eql(u8, a.name(), b.name()); + } + }; + + const HashContextAdapted = struct { + pub fn hash(_: @This(), v: []const u8) u64 { + return std.hash.Wyhash.hash(0, v); + } + pub fn eql(_: @This(), a: []const u8, b: *model.Entry) bool { + return std.mem.eql(u8, a, b.name()); + } + }; + + fn init(dir: *model.Dir) MemDir { + var self = MemDir{ + .dir = dir, + .entries = Map.initContext(main.allocator, HashContext{}), + .own_blocks = dir.entry.pack.blocks, + .own_bytes = dir.entry.size, + }; + + var count: Map.Size = 0; + var it = dir.sub; + while (it) |e| : (it = e.next) count += 1; + self.entries.ensureUnusedCapacity(count) catch unreachable; + + it = dir.sub; + while (it) |e| : (it = e.next) + self.entries.putAssumeCapacity(e, {}); + return self; + } + + fn getEntry(self: *MemDir, alloc: std.mem.Allocator, etype: model.EType, isext: bool, name: []const u8) *model.Entry { + if (self.entries.getKeyAdapted(name, HashContextAdapted{})) |e| { + // XXX: In-place conversion may be possible in some cases. + if (e.pack.etype == etype and (!isext or e.pack.isext)) { + e.pack.isext = isext; + _ = self.entries.removeAdapted(name, HashContextAdapted{}); + return e; + } + } + const e = model.Entry.create(alloc, etype, isext, name); + e.next = self.dir.sub; + self.dir.sub = e; + return e; + } + + fn addSpecial(self: *MemDir, alloc: std.mem.Allocator, name: []const u8, t: Special) void { + self.dir.items += 1; + + const e = self.getEntry(alloc, .file, false, name); + e.file().?.pack = switch (t) { + .err => .{ .err = true }, + .other_fs => .{ .other_fs = true }, + .kernfs => .{ .kernfs = true }, + .excluded => .{ .excluded = true }, + }; + } + + fn addStat(self: *MemDir, alloc: std.mem.Allocator, name: []const u8, stat: *const Stat) *model.Entry { + self.dir.items +|= 1; + if (!stat.hlinkc) { + self.dir.entry.pack.blocks +|= stat.blocks; + self.dir.entry.size +|= stat.size; + } + + const etype = if (stat.dir) model.EType.dir + else if (stat.hlinkc) model.EType.link + else model.EType.file; + const e = self.getEntry(alloc, etype, main.config.extended, name); + e.pack.blocks = stat.blocks; + e.size = stat.size; + if (e.dir()) |d| { + d.parent = self.dir; + d.pack.dev = model.devices.getId(stat.dev); + } + if (e.file()) |f| f.pack = .{ .notreg = !stat.dir and !stat.reg }; + if (e.link()) |l| l.ino = stat.ino; // TODO: Add to inodes table + if (e.ext()) |ext| ext.* = stat.ext; + return e; + } + + fn setReadError(self: *MemDir) void { + self.dir.pack.err = true; + } + + fn final(self: *MemDir, parent: ?*MemDir) void { + // Remove entries we've not seen + if (self.entries.count() > 0) { + var it = &self.dir.sub; + while (it.*) |e| { + if (self.entries.contains(e)) it.* = e.next + else it = &e.next; + } + } + + // Grab counts collected from subdirectories + self.dir.entry.pack.blocks +|= self.blocks; + self.dir.entry.size +|= self.bytes; + self.dir.items +|= self.items; + if (self.suberr) self.dir.pack.suberr = true; + if (self.dir.entry.ext()) |e| { + if (self.mtime > e.mtime) self.mtime = e.mtime; + } + + // Add own counts to parent + if (parent) |p| { + p.lock.lock(); + defer p.lock.unlock(); + p.blocks +|= self.dir.entry.pack.blocks - self.own_blocks; + p.bytes +|= self.dir.entry.size - self.own_bytes; + p.items +|= self.dir.items; + if (self.dir.entry.ext()) |e| { + if (e.mtime > p.mtime) e.mtime = p.mtime; + } + if (self.suberr or self.dir.pack.err) p.suberr = true; + } + self.entries.deinit(); + } +}; + + +pub const Dir = struct { + refcnt: std.atomic.Value(usize) = std.atomic.Value(usize).init(1), + // (XXX: This allocation can be avoided when scanning to a MemDir) + name: []const u8, + parent: ?*Dir, + out: Out, + + const Out = union(enum) { + mem: MemDir, + }; + + pub fn addSpecial(d: *Dir, t: *Thread, name: []const u8, sp: Special) void { + _ = t.files_seen.fetchAdd(1, .monotonic); + switch (d.out) { + .mem => |*m| m.addSpecial(t.arena.allocator(), name, sp), + } + } + + pub fn addStat(d: *Dir, t: *Thread, name: []const u8, stat: *const Stat) void { + _ = t.files_seen.fetchAdd(1, .monotonic); + _ = t.bytes_seen.fetchAdd((stat.blocks *| 512) / @max(1, stat.nlink), .monotonic); + switch (d.out) { + .mem => |*m| _ = m.addStat(t.arena.allocator(), name, stat), + } + } + + pub fn addDir(d: *Dir, t: *Thread, name: []const u8, stat: *const Stat) *Dir { + _ = t.files_seen.fetchAdd(1, .monotonic); + _ = t.bytes_seen.fetchAdd(stat.blocks *| 512, .monotonic); + + const s = main.allocator.create(Dir) catch unreachable; + s.* = .{ + .name = main.allocator.dupe(u8, name) catch unreachable, + .parent = d, + .out = switch (d.out) { + .mem => |*m| .{ + .mem = MemDir.init(m.addStat(t.arena.allocator(), name, stat).dir().?) + }, + }, + }; + d.ref(); + return s; + } + + pub fn setReadError(d: *Dir, t: *Thread) void { + _ = t; + switch (d.out) { + .mem => |*m| m.setReadError(), + } + } + + fn path(d: *Dir) []const u8 { + var components = std.ArrayList([]const u8).init(main.allocator); + defer components.deinit(); + var it: ?*Dir = d; + while (it) |e| : (it = e.parent) components.append(e.name) catch unreachable; + + var out = std.ArrayList(u8).init(main.allocator); + var i: usize = components.items.len-1; + while (true) { + if (i != components.items.len-1 and !(out.items.len != 0 and out.items[out.items.len-1] == '/')) out.append('/') catch unreachable; + out.appendSlice(components.items[i]) catch unreachable; + if (i == 0) break; + i -= 1; + } + return out.toOwnedSlice() catch unreachable; + } + + fn ref(d: *Dir) void { + _ = d.refcnt.fetchAdd(1, .monotonic); + } + + pub fn unref(d: *Dir) void { + if (d.refcnt.fetchSub(1, .release) != 1) return; + d.refcnt.fence(.acquire); + + switch (d.out) { + .mem => |*m| m.final(if (d.parent) |p| &p.out.mem else null), + } + + if (d.parent) |p| p.unref(); + if (d.name.len > 0) main.allocator.free(d.name); + main.allocator.destroy(d); + } +}; + + +pub const Thread = struct { + current_dir: ?*Dir = null, + lock: std.Thread.Mutex = .{}, + bytes_seen: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + files_seen: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + // Arena allocator for model.Entry structs, these are never freed. + arena: std.heap.ArenaAllocator = std.heap.ArenaAllocator.init(std.heap.page_allocator), + + pub fn setDir(t: *Thread, d: ?*Dir) void { + t.lock.lock(); + defer t.lock.unlock(); + t.current_dir = d; + } +}; + + +pub const state = struct { + pub var threads: []Thread = undefined; +}; + + +pub fn createRoot(path: []const u8, stat: *const Stat) *Dir { + // TODO: Handle other outputs + model.root = model.Entry.create(main.allocator, .dir, main.config.extended, path).dir().?; + model.root.entry.pack.blocks = stat.blocks; + model.root.entry.size = stat.size; + model.root.pack.dev = model.devices.getId(stat.dev); + + const d = main.allocator.create(Dir) catch unreachable; + d.* = .{ + .name = main.allocator.dupe(u8, path) catch unreachable, + .parent = null, + .out = .{ .mem = MemDir.init(model.root) }, + }; + return d; +} + + +pub fn draw() void { + var bytes: u64 = 0; + var files: u64 = 0; + for (state.threads) |*t| { + bytes +|= t.bytes_seen.load(.monotonic); + files += t.files_seen.load(.monotonic); + } + const r = ui.FmtSize.fmt(bytes); + std.debug.print("{} files / {s}{s}\n", .{files, r.num(), r.unit}); + + for (state.threads, 0..) |*t, i| { + const dir = blk: { + t.lock.lock(); + defer t.lock.unlock(); + break :blk if (t.current_dir) |d| d.path() else null; + }; + std.debug.print(" #{}: {s}\n", .{i, dir orelse "(waiting)"}); + if (dir) |p| main.allocator.free(p); + } +} + +pub fn keyInput(_: i32) void { +} diff --git a/src/ui.zig b/src/ui.zig index 993f252..8192443 100644 --- a/src/ui.zig +++ b/src/ui.zig @@ -43,6 +43,7 @@ pub fn quit() noreturn { // Also, init() and other ncurses-related functions may have hidden allocation, // no clue if ncurses will consistently report OOM, but we're not handling that // right now. +// TODO: Make thread-safe! pub fn oom() void { const haveui = inited; deinit();