ncdu-zig/src/sink.zig

483 lines
14 KiB
Zig
Raw Normal View History

// SPDX-FileCopyrightText: Yorhel <projects@yorhel.nl>
// SPDX-License-Identifier: MIT
const std = @import("std");
const main = @import("main.zig");
const model = @import("model.zig");
const mem_src = @import("mem_src.zig");
const mem_sink = @import("mem_sink.zig");
const json_export = @import("json_export.zig");
const ui = @import("ui.zig");
const util = @import("util.zig");
// Terminology note:
// "source" is where scan results come from, these are scan.zig, mem_src.zig
// and json_import.zig.
// "sink" is where scan results go to. This file provides a generic sink API
// for sources to use. The API forwards the results to specific sink
// implementations (mem_sink.zig or json_export.zig) and provides progress
// updates.
// API for sources:
//
// Single-threaded:
//
// createThreads(1)
// dir = createRoot(name, stat)
// dir.addSpecial(name, opt)
// dir.addFile(name, stat)
// sub = dir.addDir(name, stat)
// (no dir.stuff here)
// sub.addstuff();
// sub.unref();
// dir.unref();
// done()
//
// Multi-threaded interleaving:
//
// createThreads(n)
// dir = createRoot(name, stat)
// dir.addSpecial(name, opt)
// dir.addFile(name, stat)
// sub = dir.addDir(...)
// sub.addstuff();
// sub2 = dir.addDir(..);
// sub.unref();
// dir.unref(); // <- no more direct descendants for x, but subdirs could still be active
// sub2.addStuff();
// sub2.unref(); // <- this is where 'dir' is really done.
// done()
//
// Rule:
// No concurrent method calls on a single Dir object, but objects may be passed between threads.
// Concise stat struct for fields we're interested in, with the types used by the model.
pub const Stat = struct {
blocks: model.Blocks = 0,
size: u64 = 0,
dev: u64 = 0,
ino: u64 = 0,
nlink: u31 = 0,
hlinkc: bool = false,
dir: bool = false,
reg: bool = true,
symlink: bool = false,
ext: model.Ext = .{},
};
pub const Special = enum { err, other_fs, kernfs, excluded };
pub const Dir = struct {
refcnt: std.atomic.Value(usize) = std.atomic.Value(usize).init(1),
name: []const u8,
parent: ?*Dir,
out: Out,
const Out = union(enum) {
mem: mem_sink.Dir,
json: json_export.Dir,
};
pub fn addSpecial(d: *Dir, t: *Thread, name: []const u8, sp: Special) void {
_ = t.files_seen.fetchAdd(1, .monotonic);
switch (d.out) {
.mem => |*m| m.addSpecial(&t.sink.mem, name, sp),
.json => |*j| j.addSpecial(name, sp),
}
if (sp == .err) {
global.last_error_lock.lock();
defer global.last_error_lock.unlock();
if (global.last_error) |p| main.allocator.free(p);
const p = d.path();
global.last_error = std.fs.path.joinZ(main.allocator, &.{ p, name }) catch unreachable;
main.allocator.free(p);
}
}
pub fn addStat(d: *Dir, t: *Thread, name: []const u8, stat: *const Stat) void {
_ = t.files_seen.fetchAdd(1, .monotonic);
_ = t.addBytes((stat.blocks *| 512) / @max(1, stat.nlink));
std.debug.assert(!stat.dir);
switch (d.out) {
.mem => |*m| _ = m.addStat(&t.sink.mem, name, stat),
.json => |*j| j.addStat(name, stat),
}
}
pub fn addDir(d: *Dir, t: *Thread, name: []const u8, stat: *const Stat) *Dir {
_ = t.files_seen.fetchAdd(1, .monotonic);
_ = t.addBytes(stat.blocks *| 512);
std.debug.assert(stat.dir);
std.debug.assert(d.out != .json or d.refcnt.load(.monotonic) == 1);
const s = main.allocator.create(Dir) catch unreachable;
s.* = .{
.name = main.allocator.dupe(u8, name) catch unreachable,
.parent = d,
.out = switch (d.out) {
.mem => |*m| .{ .mem = m.addDir(&t.sink.mem, name, stat) },
.json => |*j| .{ .json = j.addDir(name, stat) },
},
};
d.ref();
return s;
}
pub fn setReadError(d: *Dir, t: *Thread) void {
_ = t;
switch (d.out) {
.mem => |*m| m.setReadError(),
.json => |*j| j.setReadError(),
}
global.last_error_lock.lock();
defer global.last_error_lock.unlock();
if (global.last_error) |p| main.allocator.free(p);
global.last_error = d.path();
}
fn path(d: *Dir) [:0]u8 {
var components = std.ArrayList([]const u8).init(main.allocator);
defer components.deinit();
var it: ?*Dir = d;
while (it) |e| : (it = e.parent) components.append(e.name) catch unreachable;
var out = std.ArrayList(u8).init(main.allocator);
var i: usize = components.items.len-1;
while (true) {
if (i != components.items.len-1 and !(out.items.len != 0 and out.items[out.items.len-1] == '/')) out.append('/') catch unreachable;
out.appendSlice(components.items[i]) catch unreachable;
if (i == 0) break;
i -= 1;
}
return out.toOwnedSliceSentinel(0) catch unreachable;
}
fn ref(d: *Dir) void {
_ = d.refcnt.fetchAdd(1, .monotonic);
}
pub fn unref(d: *Dir) void {
if (d.refcnt.fetchSub(1, .release) != 1) return;
d.refcnt.fence(.acquire);
switch (d.out) {
.mem => |*m| m.final(if (d.parent) |p| &p.out.mem else null),
.json => |*j| j.final(),
}
if (d.parent) |p| p.unref();
if (d.name.len > 0) main.allocator.free(d.name);
main.allocator.destroy(d);
}
};
pub const Thread = struct {
current_dir: ?*Dir = null,
lock: std.Thread.Mutex = .{},
// On 32-bit architectures, bytes_seen is protected by the above mutex instead.
bytes_seen: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
files_seen: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
sink: union {
mem: mem_sink.Thread,
json: void,
} = .{.mem = .{}},
fn addBytes(t: *Thread, bytes: u64) void {
if (@bitSizeOf(usize) >= 64) _ = t.bytes_seen.fetchAdd(bytes, .monotonic)
else {
t.lock.lock();
defer t.lock.unlock();
t.bytes_seen.raw += bytes;
}
}
fn getBytes(t: *Thread) u64 {
if (@bitSizeOf(usize) >= 64) return t.bytes_seen.load(.monotonic)
else {
t.lock.lock();
defer t.lock.unlock();
return t.bytes_seen.raw;
}
}
pub fn setDir(t: *Thread, d: ?*Dir) void {
t.lock.lock();
defer t.lock.unlock();
t.current_dir = d;
}
};
pub const global = struct {
pub var state: enum { done, err, zeroing, hlcnt, running } = .running;
pub var threads: []Thread = undefined;
pub var sink: enum { json, mem } = .mem;
pub var last_error: ?[:0]u8 = null;
var last_error_lock = std.Thread.Mutex{};
var need_confirm_quit = false;
};
// Must be the first thing to call from a source; initializes global state.
pub fn createThreads(num: usize) []Thread {
// JSON export does not support multiple threads, scan into memory first.
if (global.sink == .json and num > 1) {
global.sink = .mem;
mem_sink.global.stats = false;
}
global.state = .running;
if (global.last_error) |p| main.allocator.free(p);
global.last_error = null;
global.threads = main.allocator.alloc(Thread, num) catch unreachable;
for (global.threads) |*t| t.* = .{};
return global.threads;
}
// Must be the last thing to call from a source.
pub fn done() void {
switch (global.sink) {
.mem => mem_sink.done(),
.json => json_export.done(),
}
global.state = .done;
main.allocator.free(global.threads);
// We scanned into memory, now we need to scan from memory to JSON
if (global.sink == .mem and !mem_sink.global.stats) {
global.sink = .json;
mem_src.run(model.root);
}
// Clear the screen when done.
if (main.config.scan_ui == .line) main.handleEvent(false, true);
}
pub fn createRoot(path: []const u8, stat: *const Stat) *Dir {
const d = main.allocator.create(Dir) catch unreachable;
d.* = .{
.name = main.allocator.dupe(u8, path) catch unreachable,
.parent = null,
.out = switch (global.sink) {
.mem => .{ .mem = mem_sink.createRoot(path, stat) },
.json => .{ .json = json_export.createRoot(path, stat) },
},
};
return d;
}
fn drawConsole() void {
const st = struct {
var ansi: ?bool = null;
var lines_written: usize = 0;
};
const stderr = std.io.getStdErr();
const ansi = st.ansi orelse blk: {
const t = stderr.supportsAnsiEscapeCodes();
st.ansi = t;
break :blk t;
};
var buf: [4096]u8 = undefined;
var strm = std.io.fixedBufferStream(buf[0..]);
var wr = strm.writer();
while (ansi and st.lines_written > 0) {
wr.writeAll("\x1b[1F\x1b[2K") catch {};
st.lines_written -= 1;
}
if (global.state == .hlcnt) {
wr.writeAll("Counting hardlinks...") catch {};
if (model.inodes.add_total > 0)
wr.print(" {} / {}", .{ model.inodes.add_done, model.inodes.add_total }) catch {};
wr.writeByte('\n') catch {};
st.lines_written += 1;
} else if (global.state == .running) {
var bytes: u64 = 0;
var files: u64 = 0;
for (global.threads) |*t| {
bytes +|= t.getBytes();
files += t.files_seen.load(.monotonic);
}
const r = ui.FmtSize.fmt(bytes);
wr.print("{} files / {s}{s}\n", .{files, r.num(), r.unit}) catch {};
st.lines_written += 1;
for (global.threads, 0..) |*t, i| {
const dir = blk: {
t.lock.lock();
defer t.lock.unlock();
break :blk if (t.current_dir) |d| d.path() else null;
};
wr.print(" #{}: {s}\n", .{i+1, ui.shorten(ui.toUtf8(dir orelse "(waiting)"), 73)}) catch {};
st.lines_written += 1;
if (dir) |p| main.allocator.free(p);
}
}
stderr.writeAll(strm.getWritten()) catch {};
}
fn drawProgress() void {
const st = struct { var animation_pos: usize = 0; };
var bytes: u64 = 0;
var files: u64 = 0;
for (global.threads) |*t| {
bytes +|= t.getBytes();
files += t.files_seen.load(.monotonic);
}
ui.init();
const width = ui.cols -| 5;
const numthreads: u32 = @intCast(@min(global.threads.len, @max(1, ui.rows -| 10)));
const box = ui.Box.create(8 + numthreads, width, "Scanning...");
box.move(2, 2);
ui.addstr("Total items: ");
ui.addnum(.default, files);
if (width > 48) {
box.move(2, 30);
ui.addstr("size: ");
ui.addsize(.default, bytes);
}
for (0..numthreads) |i| {
box.move(3+@as(u32, @intCast(i)), 4);
const dir = blk: {
const t = &global.threads[i];
t.lock.lock();
defer t.lock.unlock();
break :blk if (t.current_dir) |d| d.path() else null;
};
ui.addstr(ui.shorten(ui.toUtf8(dir orelse "(waiting)"), width -| 6));
if (dir) |p| main.allocator.free(p);
}
blk: {
global.last_error_lock.lock();
defer global.last_error_lock.unlock();
const err = global.last_error orelse break :blk;
box.move(4 + numthreads, 2);
ui.style(.bold);
ui.addstr("Warning: ");
ui.style(.default);
ui.addstr("error scanning ");
ui.addstr(ui.shorten(ui.toUtf8(err), width -| 28));
box.move(5 + numthreads, 3);
ui.addstr("some directory sizes may not be correct.");
}
if (global.need_confirm_quit) {
box.move(6 + numthreads, width -| 20);
ui.addstr("Press ");
ui.style(.key);
ui.addch('y');
ui.style(.default);
ui.addstr(" to confirm");
} else {
box.move(6 + numthreads, width -| 18);
ui.addstr("Press ");
ui.style(.key);
ui.addch('q');
ui.style(.default);
ui.addstr(" to abort");
}
if (main.config.update_delay < std.time.ns_per_s and width > 40) {
const txt = "Scanning...";
st.animation_pos += 1;
if (st.animation_pos >= txt.len*2) st.animation_pos = 0;
if (st.animation_pos < txt.len) {
box.move(6 + numthreads, 2);
for (txt[0..st.animation_pos + 1]) |t| ui.addch(t);
} else {
var i: u32 = txt.len-1;
while (i > st.animation_pos-txt.len) : (i -= 1) {
box.move(6 + numthreads, 2+i);
ui.addch(txt[i]);
}
}
}
}
fn drawError() void {
const width = ui.cols -| 5;
const box = ui.Box.create(6, width, "Scan error");
box.move(2, 2);
ui.addstr("Unable to open directory:");
box.move(3, 4);
ui.addstr(ui.shorten(ui.toUtf8(global.last_error.?), width -| 10));
box.move(4, width -| 27);
ui.addstr("Press any key to continue");
}
fn drawMessage(msg: []const u8) void {
const width = ui.cols -| 5;
const box = ui.Box.create(4, width, "Scan error");
box.move(2, 2);
ui.addstr(msg);
}
pub fn draw() void {
switch (main.config.scan_ui.?) {
.none => {},
.line => drawConsole(),
.full => switch (global.state) {
.done => {},
.err => drawError(),
.zeroing => {
const box = ui.Box.create(4, ui.cols -| 5, "Initializing");
box.move(2, 2);
ui.addstr("Clearing directory counts...");
},
.hlcnt => {
const box = ui.Box.create(4, ui.cols -| 5, "Finalizing");
box.move(2, 2);
ui.addstr("Counting hardlinks... ");
if (model.inodes.add_total > 0) {
ui.addnum(.default, model.inodes.add_done);
ui.addstr(" / ");
ui.addnum(.default, model.inodes.add_total);
}
},
.running => drawProgress(),
},
}
}
pub fn keyInput(ch: i32) void {
switch (global.state) {
.done => {},
.err => main.state = .browse,
.zeroing => {},
.hlcnt => {},
.running => {
switch (ch) {
'q' => {
if (main.config.confirm_quit) global.need_confirm_quit = !global.need_confirm_quit
else ui.quit();
},
'y', 'Y' => if (global.need_confirm_quit) ui.quit(),
else => global.need_confirm_quit = false,
}
},
}
}