mirror of
https://codeberg.org/ziglang/zig.git
synced 2025-12-06 13:54:21 +00:00
Use std.Io.Select instead of std.Io.Group
We want to keep the number of tasks under control, so Io.Select seems to be a better tool for it.
This commit is contained in:
parent
2994a206ae
commit
231978893a
1 changed files with 83 additions and 44 deletions
|
|
@ -734,46 +734,59 @@ fn ktSingleThreaded(comptime Variant: type, view: *const MultiSliceView, total_l
|
|||
final_state.final(output);
|
||||
}
|
||||
|
||||
/// Context for a thread task that processes leaves using SIMD
|
||||
fn LeafThreadContext(comptime Variant: type) type {
|
||||
fn BatchResult(comptime Variant: type) type {
|
||||
const cv_size = Variant.cv_size;
|
||||
const leaves_per_batch = bytes_per_batch / chunk_size;
|
||||
const max_cvs_size = leaves_per_batch * cv_size;
|
||||
|
||||
return struct {
|
||||
batch_idx: usize,
|
||||
cv_len: usize,
|
||||
cvs: [max_cvs_size]u8,
|
||||
};
|
||||
}
|
||||
|
||||
fn SelectLeafContext(comptime Variant: type) type {
|
||||
const cv_size = Variant.cv_size;
|
||||
const Result = BatchResult(Variant);
|
||||
|
||||
return struct {
|
||||
view: *const MultiSliceView,
|
||||
start_offset: usize, // Byte offset in view (after first chunk)
|
||||
num_leaves: usize, // Number of leaves to process
|
||||
output_cvs: []align(@alignOf(u64)) u8, // Where to store CVs
|
||||
batch_idx: usize,
|
||||
start_offset: usize,
|
||||
num_leaves: usize,
|
||||
|
||||
fn process(ctx: @This()) void {
|
||||
const cv_size = Variant.cv_size;
|
||||
fn process(ctx: @This()) Result {
|
||||
var result: Result = .{
|
||||
.batch_idx = ctx.batch_idx,
|
||||
.cv_len = ctx.num_leaves * cv_size,
|
||||
.cvs = undefined,
|
||||
};
|
||||
|
||||
// Thread-local scratch buffer for copying data that spans slice boundaries
|
||||
var leaf_buffer: [bytes_per_batch]u8 align(cache_line_size) = undefined;
|
||||
|
||||
var leaves_processed: usize = 0;
|
||||
var byte_offset = ctx.start_offset;
|
||||
var cv_offset: usize = 0;
|
||||
|
||||
// Process leaves in SIMD batches
|
||||
const simd_batch_bytes = optimal_vector_len * chunk_size;
|
||||
while (leaves_processed + optimal_vector_len <= ctx.num_leaves) {
|
||||
if (ctx.view.tryGetSlice(byte_offset, byte_offset + simd_batch_bytes)) |leaf_data| {
|
||||
var leaf_cvs: [optimal_vector_len * Variant.cv_size]u8 = undefined;
|
||||
processLeaves(Variant, optimal_vector_len, leaf_data, &leaf_cvs);
|
||||
@memcpy(ctx.output_cvs[cv_offset..][0..leaf_cvs.len], &leaf_cvs);
|
||||
@memcpy(result.cvs[cv_offset..][0..leaf_cvs.len], &leaf_cvs);
|
||||
} else {
|
||||
ctx.view.copyRange(byte_offset, byte_offset + simd_batch_bytes, leaf_buffer[0..simd_batch_bytes]);
|
||||
var leaf_cvs: [optimal_vector_len * Variant.cv_size]u8 = undefined;
|
||||
processLeaves(Variant, optimal_vector_len, leaf_buffer[0..simd_batch_bytes], &leaf_cvs);
|
||||
@memcpy(ctx.output_cvs[cv_offset..][0..leaf_cvs.len], &leaf_cvs);
|
||||
@memcpy(result.cvs[cv_offset..][0..leaf_cvs.len], &leaf_cvs);
|
||||
}
|
||||
leaves_processed += optimal_vector_len;
|
||||
byte_offset += optimal_vector_len * chunk_size;
|
||||
cv_offset += optimal_vector_len * cv_size;
|
||||
}
|
||||
|
||||
// Process remaining leaves one at a time (should be less than optimal_vector_len)
|
||||
while (leaves_processed < ctx.num_leaves) {
|
||||
const leaf_end = byte_offset + chunk_size;
|
||||
var cv_buffer: [64]u8 = undefined; // Max CV size is 64 bytes
|
||||
var cv_buffer: [64]u8 = undefined;
|
||||
|
||||
if (ctx.view.tryGetSlice(byte_offset, leaf_end)) |leaf_data| {
|
||||
const cv_slice = MultiSliceView.init(leaf_data, &[_]u8{}, &[_]u8{});
|
||||
|
|
@ -783,22 +796,23 @@ fn LeafThreadContext(comptime Variant: type) type {
|
|||
const cv_slice = MultiSliceView.init(leaf_buffer[0..chunk_size], &[_]u8{}, &[_]u8{});
|
||||
Variant.turboShakeToBuffer(&cv_slice, 0x0B, cv_buffer[0..cv_size]);
|
||||
}
|
||||
@memcpy(ctx.output_cvs[cv_offset..][0..cv_size], cv_buffer[0..cv_size]);
|
||||
@memcpy(result.cvs[cv_offset..][0..cv_size], cv_buffer[0..cv_size]);
|
||||
|
||||
leaves_processed += 1;
|
||||
byte_offset += chunk_size;
|
||||
cv_offset += cv_size;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Context for the final partial leaf (may be smaller than chunk_size)
|
||||
fn FinalLeafContext(comptime Variant: type) type {
|
||||
return struct {
|
||||
view: *const MultiSliceView,
|
||||
start_offset: usize,
|
||||
leaf_len: usize, // May be less than chunk_size
|
||||
leaf_len: usize,
|
||||
output_cv: []align(@alignOf(u64)) u8,
|
||||
|
||||
fn process(ctx: @This()) void {
|
||||
|
|
@ -819,7 +833,6 @@ fn FinalLeafContext(comptime Variant: type) type {
|
|||
};
|
||||
}
|
||||
|
||||
/// Generic multi-threaded implementation with bounded heap allocation.
|
||||
fn ktMultiThreaded(
|
||||
comptime Variant: type,
|
||||
allocator: Allocator,
|
||||
|
|
@ -853,40 +866,66 @@ fn ktMultiThreaded(
|
|||
const has_partial_leaf = (remaining_bytes % chunk_size) != 0;
|
||||
const partial_leaf_size = if (has_partial_leaf) remaining_bytes % chunk_size else 0;
|
||||
|
||||
const max_concurrent_batches = 256;
|
||||
const cvs_per_super_batch = max_concurrent_batches * leaves_per_batch * cv_size;
|
||||
if (full_leaves > 0) {
|
||||
const total_batches = std.math.divCeil(usize, full_leaves, leaves_per_batch) catch unreachable;
|
||||
const max_concurrent: usize = @min(256, total_batches);
|
||||
|
||||
const cv_buf = try allocator.alignedAlloc(u8, std.mem.Alignment.of(u64), cvs_per_super_batch);
|
||||
defer allocator.free(cv_buf);
|
||||
const Result = BatchResult(Variant);
|
||||
const SelectResult = union(enum) { batch: Result };
|
||||
const Select = Io.Select(SelectResult);
|
||||
|
||||
var leaves_processed: usize = 0;
|
||||
while (leaves_processed < full_leaves) {
|
||||
const leaves_in_super_batch = @min(max_concurrent_batches * leaves_per_batch, full_leaves - leaves_processed);
|
||||
const num_batches = std.math.divCeil(usize, leaves_in_super_batch, leaves_per_batch) catch unreachable;
|
||||
const select_buf = try allocator.alloc(SelectResult, max_concurrent);
|
||||
defer allocator.free(select_buf);
|
||||
|
||||
var group: Io.Group = .init;
|
||||
// Buffer for out-of-order results (select_buf slots get reused)
|
||||
const pending_cv_buf = try allocator.alloc([leaves_per_batch * cv_size]u8, max_concurrent);
|
||||
defer allocator.free(pending_cv_buf);
|
||||
var pending_cv_lens: [256]usize = .{0} ** 256;
|
||||
|
||||
for (0..num_batches) |batch_idx| {
|
||||
const batch_start_leaf = leaves_processed + batch_idx * leaves_per_batch;
|
||||
var select: Select = .init(io, select_buf);
|
||||
var batches_spawned: usize = 0;
|
||||
var next_to_process: usize = 0;
|
||||
|
||||
while (next_to_process < total_batches) {
|
||||
while (batches_spawned < total_batches and batches_spawned - next_to_process < max_concurrent) {
|
||||
const batch_start_leaf = batches_spawned * leaves_per_batch;
|
||||
const batch_leaves = @min(leaves_per_batch, full_leaves - batch_start_leaf);
|
||||
|
||||
if (batch_leaves == 0) break;
|
||||
|
||||
const start_offset = chunk_size + batch_start_leaf * chunk_size;
|
||||
const cv_start = batch_idx * leaves_per_batch * cv_size;
|
||||
|
||||
group.async(io, LeafThreadContext(Variant).process, .{LeafThreadContext(Variant){
|
||||
select.async(.batch, SelectLeafContext(Variant).process, .{SelectLeafContext(Variant){
|
||||
.view = view,
|
||||
.batch_idx = batches_spawned,
|
||||
.start_offset = start_offset,
|
||||
.num_leaves = batch_leaves,
|
||||
.output_cvs = @alignCast(cv_buf[cv_start..][0 .. batch_leaves * cv_size]),
|
||||
}});
|
||||
batches_spawned += 1;
|
||||
}
|
||||
|
||||
group.wait(io);
|
||||
const result = select.wait() catch unreachable;
|
||||
const batch = result.batch;
|
||||
const slot = batch.batch_idx % max_concurrent;
|
||||
|
||||
final_state.update(cv_buf[0 .. leaves_in_super_batch * cv_size]);
|
||||
leaves_processed += leaves_in_super_batch;
|
||||
if (batch.batch_idx == next_to_process) {
|
||||
final_state.update(batch.cvs[0..batch.cv_len]);
|
||||
next_to_process += 1;
|
||||
|
||||
// Drain pending batches that are now ready
|
||||
while (next_to_process < total_batches) {
|
||||
const pending_slot = next_to_process % max_concurrent;
|
||||
const pending_len = pending_cv_lens[pending_slot];
|
||||
if (pending_len == 0) break;
|
||||
|
||||
final_state.update(pending_cv_buf[pending_slot][0..pending_len]);
|
||||
pending_cv_lens[pending_slot] = 0;
|
||||
next_to_process += 1;
|
||||
}
|
||||
} else {
|
||||
@memcpy(pending_cv_buf[slot][0..batch.cv_len], batch.cvs[0..batch.cv_len]);
|
||||
pending_cv_lens[slot] = batch.cv_len;
|
||||
}
|
||||
}
|
||||
|
||||
select.group.wait(io);
|
||||
}
|
||||
|
||||
if (has_partial_leaf) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue