diff --git a/src/bin_export.zig b/src/bin_export.zig index 8fb433b..0767fff 100644 --- a/src/bin_export.zig +++ b/src/bin_export.zig @@ -20,7 +20,6 @@ pub const global = struct { }; const BLOCK_SIZE: usize = 64*1024; -const COMPRESSED_SIZE: usize = 65824; // ZSTD_COMPRESSBOUND(BLOCK_SIZE) pub const SIGNATURE = "\xbfncduEX1"; @@ -66,13 +65,32 @@ inline fn blockHeader(id: u4, len: u28) [4]u8 { return bigu32((@as(u32, id) << 2 inline fn cborByte(major: CborMajor, arg: u5) u8 { return (@as(u8, @intFromEnum(major)) << 5) | arg; } +// ZSTD_COMPRESSBOUND(), assuming input does not exceed ZSTD_MAX_INPUT_SIZE +fn compressBound(size: usize) usize { return size + (size>>8) + (if (size < (128<<10)) ((128<<10) - size) >> 11 else 0); } + + +// (Uncompressed) data block size. +// Start with 64k, then use increasingly larger block sizes as the export file +// grows. This is both to stay within the block number limit of the index block +// and because, with a larger index block, the reader will end up using more +// memory anyway. +fn blockSize(num: u32) usize { + // block size uncompressed data in this num range + // # mil # KiB # GiB + return if (num < ( 1<<20)) 64<<10 // 64 + else if (num < ( 2<<20)) 128<<10 // 128 + else if (num < ( 4<<20)) 256<<10 // 512 + else if (num < ( 8<<20)) 512<<10 // 2048 + else if (num < (16<<20)) 1024<<10 // 8192 + else 2048<<10; // 32768 +} + pub const Thread = struct { buf: []u8 = undefined, - off: usize = BLOCK_SIZE, + off: usize = std.math.maxInt(usize) - (1<<10), // large number to trigger a flush() for the first write block_num: u32 = std.math.maxInt(u32), itemref: u64 = 0, // ref of item currently being written - tmp: []u8 = undefined, // Temporary buffer for headers and compression. // unused, but kept around for easy debugging fn compressNone(in: []const u8, out: []u8) usize { @@ -88,21 +106,26 @@ pub const Thread = struct { } } - fn createBlock(t: *Thread) []const u8 { - if (t.block_num == std.math.maxInt(u32) or t.off <= 1) return ""; + fn createBlock(t: *Thread) std.ArrayList(u8) { + var out = std.ArrayList(u8).init(main.allocator); + if (t.block_num == std.math.maxInt(u32) or t.off == 0) return out; - const bodylen = compressZstd(t.buf[0..t.off], t.tmp[12..]); - const blocklen: u28 = @intCast(bodylen + 16); - t.tmp[0..4].* = blockHeader(0, blocklen); - t.tmp[4..8].* = bigu32(t.block_num); - t.tmp[8..12].* = bigu32(@intCast(t.off)); - t.tmp[12+bodylen..][0..4].* = blockHeader(0, blocklen); - return t.tmp[0..blocklen]; + out.ensureTotalCapacityPrecise(16 + compressBound(t.off)) catch unreachable; + out.items.len = out.capacity; + const bodylen = compressZstd(t.buf[0..t.off], out.items[12..]); + out.items.len = 16 + bodylen; + + out.items[0..4].* = blockHeader(0, @intCast(out.items.len)); + out.items[4..8].* = bigu32(t.block_num); + out.items[8..12].* = bigu32(@intCast(t.off)); + out.items[12+bodylen..][0..4].* = blockHeader(0, @intCast(out.items.len)); + return out; } fn flush(t: *Thread, expected_len: usize) void { @setCold(true); const block = createBlock(t); + defer block.deinit(); global.lock.lock(); defer global.lock.unlock(); @@ -110,11 +133,11 @@ pub const Thread = struct { // in which case we would probably have error'ed out earlier anyway. if (expected_len > t.buf.len) ui.die("Error writing data: path too long.\n", .{}); - if (block.len > 0) { + if (block.items.len > 0) { if (global.file_off >= (1<<40)) ui.die("Export data file has grown too large, please report a bug.\n", .{}); - global.index.items[4..][t.block_num*8..][0..8].* = bigu64((global.file_off << 24) + block.len); - global.file_off += block.len; - global.fd.writeAll(block) catch |e| + global.index.items[4..][t.block_num*8..][0..8].* = bigu64((global.file_off << 24) + block.items.len); + global.file_off += block.items.len; + global.fd.writeAll(block.items) catch |e| ui.die("Error writing to file: {s}.\n", .{ ui.errorString(e) }); } @@ -122,6 +145,9 @@ pub const Thread = struct { t.block_num = @intCast((global.index.items.len - 4) / 8); global.index.appendSlice(&[1]u8{0}**8) catch unreachable; if (global.index.items.len + 12 >= (1<<28)) ui.die("Too many data blocks, please report a bug.\n", .{}); + + const newsize = blockSize(t.block_num); + if (t.buf.len != newsize) t.buf = main.allocator.realloc(t.buf, newsize) catch unreachable; } fn cborHead(t: *Thread, major: CborMajor, arg: u64) void { @@ -410,7 +436,6 @@ pub const Dir = struct { pub fn createRoot(stat: *const sink.Stat, threads: []sink.Thread) Dir { for (threads) |*t| { t.sink.bin.buf = main.allocator.alloc(u8, BLOCK_SIZE) catch unreachable; - t.sink.bin.tmp = main.allocator.alloc(u8, COMPRESSED_SIZE) catch unreachable; } return .{ .stat = stat.* }; @@ -420,7 +445,6 @@ pub fn done(threads: []sink.Thread) void { for (threads) |*t| { t.sink.bin.flush(0); main.allocator.free(t.sink.bin.buf); - main.allocator.free(t.sink.bin.tmp); } while (std.mem.endsWith(u8, global.index.items, &[1]u8{0}**8))