bin_export: Adaptively adjust block size

This commit is contained in:
Yorhel 2024-08-11 10:56:41 +02:00
parent c30699f93b
commit 4ef9c3e817

View file

@ -20,7 +20,6 @@ pub const global = struct {
};
const BLOCK_SIZE: usize = 64*1024;
const COMPRESSED_SIZE: usize = 65824; // ZSTD_COMPRESSBOUND(BLOCK_SIZE)
pub const SIGNATURE = "\xbfncduEX1";
@ -66,13 +65,32 @@ inline fn blockHeader(id: u4, len: u28) [4]u8 { return bigu32((@as(u32, id) << 2
inline fn cborByte(major: CborMajor, arg: u5) u8 { return (@as(u8, @intFromEnum(major)) << 5) | arg; }
// ZSTD_COMPRESSBOUND(), assuming input does not exceed ZSTD_MAX_INPUT_SIZE
fn compressBound(size: usize) usize { return size + (size>>8) + (if (size < (128<<10)) ((128<<10) - size) >> 11 else 0); }
// (Uncompressed) data block size.
// Start with 64k, then use increasingly larger block sizes as the export file
// grows. This is both to stay within the block number limit of the index block
// and because, with a larger index block, the reader will end up using more
// memory anyway.
fn blockSize(num: u32) usize {
// block size uncompressed data in this num range
// # mil # KiB # GiB
return if (num < ( 1<<20)) 64<<10 // 64
else if (num < ( 2<<20)) 128<<10 // 128
else if (num < ( 4<<20)) 256<<10 // 512
else if (num < ( 8<<20)) 512<<10 // 2048
else if (num < (16<<20)) 1024<<10 // 8192
else 2048<<10; // 32768
}
pub const Thread = struct {
buf: []u8 = undefined,
off: usize = BLOCK_SIZE,
off: usize = std.math.maxInt(usize) - (1<<10), // large number to trigger a flush() for the first write
block_num: u32 = std.math.maxInt(u32),
itemref: u64 = 0, // ref of item currently being written
tmp: []u8 = undefined, // Temporary buffer for headers and compression.
// unused, but kept around for easy debugging
fn compressNone(in: []const u8, out: []u8) usize {
@ -88,21 +106,26 @@ pub const Thread = struct {
}
}
fn createBlock(t: *Thread) []const u8 {
if (t.block_num == std.math.maxInt(u32) or t.off <= 1) return "";
fn createBlock(t: *Thread) std.ArrayList(u8) {
var out = std.ArrayList(u8).init(main.allocator);
if (t.block_num == std.math.maxInt(u32) or t.off == 0) return out;
const bodylen = compressZstd(t.buf[0..t.off], t.tmp[12..]);
const blocklen: u28 = @intCast(bodylen + 16);
t.tmp[0..4].* = blockHeader(0, blocklen);
t.tmp[4..8].* = bigu32(t.block_num);
t.tmp[8..12].* = bigu32(@intCast(t.off));
t.tmp[12+bodylen..][0..4].* = blockHeader(0, blocklen);
return t.tmp[0..blocklen];
out.ensureTotalCapacityPrecise(16 + compressBound(t.off)) catch unreachable;
out.items.len = out.capacity;
const bodylen = compressZstd(t.buf[0..t.off], out.items[12..]);
out.items.len = 16 + bodylen;
out.items[0..4].* = blockHeader(0, @intCast(out.items.len));
out.items[4..8].* = bigu32(t.block_num);
out.items[8..12].* = bigu32(@intCast(t.off));
out.items[12+bodylen..][0..4].* = blockHeader(0, @intCast(out.items.len));
return out;
}
fn flush(t: *Thread, expected_len: usize) void {
@setCold(true);
const block = createBlock(t);
defer block.deinit();
global.lock.lock();
defer global.lock.unlock();
@ -110,11 +133,11 @@ pub const Thread = struct {
// in which case we would probably have error'ed out earlier anyway.
if (expected_len > t.buf.len) ui.die("Error writing data: path too long.\n", .{});
if (block.len > 0) {
if (block.items.len > 0) {
if (global.file_off >= (1<<40)) ui.die("Export data file has grown too large, please report a bug.\n", .{});
global.index.items[4..][t.block_num*8..][0..8].* = bigu64((global.file_off << 24) + block.len);
global.file_off += block.len;
global.fd.writeAll(block) catch |e|
global.index.items[4..][t.block_num*8..][0..8].* = bigu64((global.file_off << 24) + block.items.len);
global.file_off += block.items.len;
global.fd.writeAll(block.items) catch |e|
ui.die("Error writing to file: {s}.\n", .{ ui.errorString(e) });
}
@ -122,6 +145,9 @@ pub const Thread = struct {
t.block_num = @intCast((global.index.items.len - 4) / 8);
global.index.appendSlice(&[1]u8{0}**8) catch unreachable;
if (global.index.items.len + 12 >= (1<<28)) ui.die("Too many data blocks, please report a bug.\n", .{});
const newsize = blockSize(t.block_num);
if (t.buf.len != newsize) t.buf = main.allocator.realloc(t.buf, newsize) catch unreachable;
}
fn cborHead(t: *Thread, major: CborMajor, arg: u64) void {
@ -410,7 +436,6 @@ pub const Dir = struct {
pub fn createRoot(stat: *const sink.Stat, threads: []sink.Thread) Dir {
for (threads) |*t| {
t.sink.bin.buf = main.allocator.alloc(u8, BLOCK_SIZE) catch unreachable;
t.sink.bin.tmp = main.allocator.alloc(u8, COMPRESSED_SIZE) catch unreachable;
}
return .{ .stat = stat.* };
@ -420,7 +445,6 @@ pub fn done(threads: []sink.Thread) void {
for (threads) |*t| {
t.sink.bin.flush(0);
main.allocator.free(t.sink.bin.buf);
main.allocator.free(t.sink.bin.tmp);
}
while (std.mem.endsWith(u8, global.index.items, &[1]u8{0}**8))