Add support for multithreaded scanning to JSON export

by scanning into memory first.
This commit is contained in:
Yorhel 2024-07-17 16:38:10 +02:00
parent 705bd8907d
commit 9b517f27b1
3 changed files with 113 additions and 13 deletions

View file

@ -306,7 +306,6 @@ test "JSON parser" {
const Ctx = struct {
p: *Parser,
sink: *sink.Thread,
items_seen: u64 = 0,
stat: sink.Stat = .{},
special: ?sink.Special = null,
namelen: usize = 0,
@ -457,8 +456,7 @@ fn item(ctx: *Ctx, parent: ?*sink.Dir, dev: u64) void {
parent.?.addStat(ctx.sink, name, &ctx.stat);
}
ctx.items_seen += 1;
if ((ctx.items_seen & 1023) == 0)
if ((ctx.sink.files_seen.load(.monotonic) & 1024) == 0)
main.handleEvent(false, false);
}

79
src/mem_src.zig Normal file
View file

@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: Yorhel <projects@yorhel.nl>
// SPDX-License-Identifier: MIT
const std = @import("std");
const main = @import("main.zig");
const model = @import("model.zig");
const sink = @import("sink.zig");
// Emit the memory tree to the sink in depth-first order from a single thread,
// suitable for JSON export.
fn toStat(e: *model.Entry) sink.Stat {
const el = e.link();
return sink.Stat{
.blocks = e.pack.blocks,
.size = e.size,
.dev =
if (e.dir()) |d| model.devices.list.items[d.pack.dev]
else if (el) |l| model.devices.list.items[l.parent.pack.dev]
else undefined,
.ino = if (el) |l| l.ino else undefined,
.nlink = if (el) |l| l.pack.nlink else undefined,
.hlinkc = el != null,
.dir = e.pack.etype == .dir,
.reg = if (e.file()) |f| !f.pack.notreg else e.pack.etype != .dir,
.symlink = undefined,
.ext = if (e.ext()) |x| x.* else .{},
};
}
const Ctx = struct {
sink: *sink.Thread,
stat: sink.Stat,
};
fn rec(ctx: *Ctx, dir: *sink.Dir, entry: *model.Entry) void {
if ((ctx.sink.files_seen.load(.monotonic) & 1024) == 0)
main.handleEvent(false, false);
ctx.stat = toStat(entry);
if (entry.dir()) |d| {
var ndir = dir.addDir(ctx.sink, entry.name(), &ctx.stat);
ctx.sink.setDir(ndir);
if (d.pack.err) ndir.setReadError(ctx.sink);
var it = d.sub;
while (it) |e| : (it = e.next) rec(ctx, ndir, e);
ctx.sink.setDir(dir);
ndir.unref();
return;
}
if (entry.file()) |f| {
if (f.pack.err) return dir.addSpecial(ctx.sink, entry.name(), .err);
if (f.pack.excluded) return dir.addSpecial(ctx.sink, entry.name(), .excluded);
if (f.pack.other_fs) return dir.addSpecial(ctx.sink, entry.name(), .other_fs);
if (f.pack.kernfs) return dir.addSpecial(ctx.sink, entry.name(), .kernfs);
}
dir.addStat(ctx.sink, entry.name(), &ctx.stat);
}
pub fn run(d: *model.Dir) void {
const sink_threads = sink.createThreads(1);
var ctx = .{
.sink = &sink_threads[0],
.stat = toStat(&d.entry),
};
var buf = std.ArrayList(u8).init(main.allocator);
d.fmtPath(true, &buf);
const root = sink.createRoot(buf.items, &ctx.stat);
buf.deinit();
var it = d.sub;
while (it) |e| : (it = e.next) rec(&ctx, root, e);
root.unref();
sink.done();
}

View file

@ -4,6 +4,7 @@
const std = @import("std");
const main = @import("main.zig");
const model = @import("model.zig");
const mem_src = @import("mem_src.zig");
const ui = @import("ui.zig");
const util = @import("util.zig");
@ -267,6 +268,7 @@ const MemDir = struct {
}
fn addStat(self: *MemDir, alloc: std.mem.Allocator, name: []const u8, stat: *const Stat) *model.Entry {
if (state.defer_json == null) {
self.dir.items +|= 1;
if (!stat.hlinkc) {
self.dir.entry.pack.blocks +|= stat.blocks;
@ -275,6 +277,7 @@ const MemDir = struct {
if (self.dir.entry.ext()) |e| {
if (stat.ext.mtime > e.mtime) e.mtime = stat.ext.mtime;
}
}
const etype = if (stat.dir) model.EType.dir
else if (stat.hlinkc) model.EType.link
@ -312,6 +315,9 @@ const MemDir = struct {
else it = &e.next;
}
}
self.entries.deinit();
if (state.defer_json != null) return;
// Grab counts collected from subdirectories
self.dir.entry.pack.blocks +|= self.blocks;
@ -334,7 +340,6 @@ const MemDir = struct {
}
if (self.suberr or self.dir.pack.suberr or self.dir.pack.err) p.suberr = true;
}
self.entries.deinit();
}
};
@ -473,6 +478,7 @@ pub const state = struct {
pub var status: enum { done, err, zeroing, hlcnt, running } = .running;
pub var threads: []Thread = undefined;
pub var out: Out = .{ .mem = null };
pub var defer_json: ?*JsonWriter = null;
pub var last_error: ?[:0]u8 = null;
var last_error_lock = std.Thread.Mutex{};
@ -492,7 +498,16 @@ pub fn setupJsonOutput(out: std.fs.File) void {
// Must be the first thing to call from a source; initializes global state.
pub fn createThreads(num: usize) []Thread {
std.debug.assert(num == 1 or state.out != .json);
switch (state.out) {
.mem => {},
.json => |j| {
if (num > 1) {
state.out = state.Out{ .mem = null };
state.defer_json = j;
}
},
}
state.status = .running;
if (state.last_error) |p| main.allocator.free(p);
state.last_error = null;
@ -505,7 +520,7 @@ pub fn createThreads(num: usize) []Thread {
// Must be the last thing to call from a source.
pub fn done() void {
switch (state.out) {
.mem => {
.mem => if (state.defer_json == null) {
state.status = .hlcnt;
main.handleEvent(false, true);
const dir = state.out.mem orelse model.root;
@ -524,6 +539,14 @@ pub fn done() void {
}
state.status = .done;
main.allocator.free(state.threads);
// We scanned into memory, now we need to scan from memory to JSON
if (state.defer_json) |j| {
state.out = state.Out{ .json = j };
state.defer_json = null;
mem_src.run(model.root);
}
// Clear the screen when done.
if (main.config.scan_ui == .line) main.handleEvent(false, true);
}