From 9b517f27b1d42d05830218cc9a630c61c4733df1 Mon Sep 17 00:00:00 2001 From: Yorhel Date: Wed, 17 Jul 2024 16:38:10 +0200 Subject: [PATCH] Add support for multithreaded scanning to JSON export by scanning into memory first. --- src/json_import.zig | 4 +-- src/mem_src.zig | 79 +++++++++++++++++++++++++++++++++++++++++++++ src/sink.zig | 43 ++++++++++++++++++------ 3 files changed, 113 insertions(+), 13 deletions(-) create mode 100644 src/mem_src.zig diff --git a/src/json_import.zig b/src/json_import.zig index 8b7b5c4..b7c3c43 100644 --- a/src/json_import.zig +++ b/src/json_import.zig @@ -306,7 +306,6 @@ test "JSON parser" { const Ctx = struct { p: *Parser, sink: *sink.Thread, - items_seen: u64 = 0, stat: sink.Stat = .{}, special: ?sink.Special = null, namelen: usize = 0, @@ -457,8 +456,7 @@ fn item(ctx: *Ctx, parent: ?*sink.Dir, dev: u64) void { parent.?.addStat(ctx.sink, name, &ctx.stat); } - ctx.items_seen += 1; - if ((ctx.items_seen & 1023) == 0) + if ((ctx.sink.files_seen.load(.monotonic) & 1024) == 0) main.handleEvent(false, false); } diff --git a/src/mem_src.zig b/src/mem_src.zig new file mode 100644 index 0000000..cb5ddb2 --- /dev/null +++ b/src/mem_src.zig @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: Yorhel +// SPDX-License-Identifier: MIT + +const std = @import("std"); +const main = @import("main.zig"); +const model = @import("model.zig"); +const sink = @import("sink.zig"); + +// Emit the memory tree to the sink in depth-first order from a single thread, +// suitable for JSON export. + +fn toStat(e: *model.Entry) sink.Stat { + const el = e.link(); + return sink.Stat{ + .blocks = e.pack.blocks, + .size = e.size, + .dev = + if (e.dir()) |d| model.devices.list.items[d.pack.dev] + else if (el) |l| model.devices.list.items[l.parent.pack.dev] + else undefined, + .ino = if (el) |l| l.ino else undefined, + .nlink = if (el) |l| l.pack.nlink else undefined, + .hlinkc = el != null, + .dir = e.pack.etype == .dir, + .reg = if (e.file()) |f| !f.pack.notreg else e.pack.etype != .dir, + .symlink = undefined, + .ext = if (e.ext()) |x| x.* else .{}, + }; +} + +const Ctx = struct { + sink: *sink.Thread, + stat: sink.Stat, +}; + + +fn rec(ctx: *Ctx, dir: *sink.Dir, entry: *model.Entry) void { + if ((ctx.sink.files_seen.load(.monotonic) & 1024) == 0) + main.handleEvent(false, false); + + ctx.stat = toStat(entry); + if (entry.dir()) |d| { + var ndir = dir.addDir(ctx.sink, entry.name(), &ctx.stat); + ctx.sink.setDir(ndir); + if (d.pack.err) ndir.setReadError(ctx.sink); + var it = d.sub; + while (it) |e| : (it = e.next) rec(ctx, ndir, e); + ctx.sink.setDir(dir); + ndir.unref(); + return; + } + if (entry.file()) |f| { + if (f.pack.err) return dir.addSpecial(ctx.sink, entry.name(), .err); + if (f.pack.excluded) return dir.addSpecial(ctx.sink, entry.name(), .excluded); + if (f.pack.other_fs) return dir.addSpecial(ctx.sink, entry.name(), .other_fs); + if (f.pack.kernfs) return dir.addSpecial(ctx.sink, entry.name(), .kernfs); + } + dir.addStat(ctx.sink, entry.name(), &ctx.stat); +} + + +pub fn run(d: *model.Dir) void { + const sink_threads = sink.createThreads(1); + + var ctx = .{ + .sink = &sink_threads[0], + .stat = toStat(&d.entry), + }; + var buf = std.ArrayList(u8).init(main.allocator); + d.fmtPath(true, &buf); + const root = sink.createRoot(buf.items, &ctx.stat); + buf.deinit(); + + var it = d.sub; + while (it) |e| : (it = e.next) rec(&ctx, root, e); + + root.unref(); + sink.done(); +} diff --git a/src/sink.zig b/src/sink.zig index f0ff844..34f6e06 100644 --- a/src/sink.zig +++ b/src/sink.zig @@ -4,6 +4,7 @@ const std = @import("std"); const main = @import("main.zig"); const model = @import("model.zig"); +const mem_src = @import("mem_src.zig"); const ui = @import("ui.zig"); const util = @import("util.zig"); @@ -267,13 +268,15 @@ const MemDir = struct { } fn addStat(self: *MemDir, alloc: std.mem.Allocator, name: []const u8, stat: *const Stat) *model.Entry { - self.dir.items +|= 1; - if (!stat.hlinkc) { - self.dir.entry.pack.blocks +|= stat.blocks; - self.dir.entry.size +|= stat.size; - } - if (self.dir.entry.ext()) |e| { - if (stat.ext.mtime > e.mtime) e.mtime = stat.ext.mtime; + if (state.defer_json == null) { + self.dir.items +|= 1; + if (!stat.hlinkc) { + self.dir.entry.pack.blocks +|= stat.blocks; + self.dir.entry.size +|= stat.size; + } + if (self.dir.entry.ext()) |e| { + if (stat.ext.mtime > e.mtime) e.mtime = stat.ext.mtime; + } } const etype = if (stat.dir) model.EType.dir @@ -312,6 +315,9 @@ const MemDir = struct { else it = &e.next; } } + self.entries.deinit(); + + if (state.defer_json != null) return; // Grab counts collected from subdirectories self.dir.entry.pack.blocks +|= self.blocks; @@ -334,7 +340,6 @@ const MemDir = struct { } if (self.suberr or self.dir.pack.suberr or self.dir.pack.err) p.suberr = true; } - self.entries.deinit(); } }; @@ -473,6 +478,7 @@ pub const state = struct { pub var status: enum { done, err, zeroing, hlcnt, running } = .running; pub var threads: []Thread = undefined; pub var out: Out = .{ .mem = null }; + pub var defer_json: ?*JsonWriter = null; pub var last_error: ?[:0]u8 = null; var last_error_lock = std.Thread.Mutex{}; @@ -492,7 +498,16 @@ pub fn setupJsonOutput(out: std.fs.File) void { // Must be the first thing to call from a source; initializes global state. pub fn createThreads(num: usize) []Thread { - std.debug.assert(num == 1 or state.out != .json); + switch (state.out) { + .mem => {}, + .json => |j| { + if (num > 1) { + state.out = state.Out{ .mem = null }; + state.defer_json = j; + } + }, + } + state.status = .running; if (state.last_error) |p| main.allocator.free(p); state.last_error = null; @@ -505,7 +520,7 @@ pub fn createThreads(num: usize) []Thread { // Must be the last thing to call from a source. pub fn done() void { switch (state.out) { - .mem => { + .mem => if (state.defer_json == null) { state.status = .hlcnt; main.handleEvent(false, true); const dir = state.out.mem orelse model.root; @@ -524,6 +539,14 @@ pub fn done() void { } state.status = .done; main.allocator.free(state.threads); + + // We scanned into memory, now we need to scan from memory to JSON + if (state.defer_json) |j| { + state.out = state.Out{ .json = j }; + state.defer_json = null; + mem_src.run(model.root); + } + // Clear the screen when done. if (main.config.scan_ui == .line) main.handleEvent(false, true); }