mirror of
https://codeberg.org/ziglang/zig.git
synced 2025-12-06 05:44:20 +00:00
Rewrite ktMultiThreaded
This commit is contained in:
parent
c3663ca553
commit
632e7cd8ef
1 changed files with 180 additions and 62 deletions
|
|
@ -14,13 +14,13 @@ const cache_line_size = std.atomic.cache_line;
|
|||
// Optimal SIMD vector length for u64 on this target platform
|
||||
const optimal_vector_len = std.simd.suggestVectorLength(u64) orelse 1;
|
||||
|
||||
// Number of bytes processed per SIMD batch in multi-threaded mode
|
||||
const bytes_per_batch = 256 * 1024;
|
||||
|
||||
// Multi-threading threshold: inputs larger than this will use parallel processing.
|
||||
// Benchmarked optimal value for ReleaseFast mode.
|
||||
const large_file_threshold: usize = 2 * 1024 * 1024; // 2 MB
|
||||
|
||||
// Number of chunks each thread processes in parallel mode (default: 8)
|
||||
const batch_count: usize = 8;
|
||||
|
||||
// Round constants for Keccak-p[1600,12]
|
||||
const RC = [12]u64{
|
||||
0x000000008000808B,
|
||||
|
|
@ -214,7 +214,7 @@ const MultiSliceView = struct {
|
|||
}
|
||||
};
|
||||
|
||||
/// Apply Keccak-p[1600,12] to N states in parallel
|
||||
/// Apply Keccak-p[1600,12] to N states using SIMD
|
||||
fn keccakP1600timesN(comptime N: usize, states: *[5][5]@Vector(N, u64)) void {
|
||||
@setEvalBranchQuota(10000);
|
||||
|
||||
|
|
@ -291,7 +291,7 @@ fn keccakP1600timesN(comptime N: usize, states: *[5][5]@Vector(N, u64)) void {
|
|||
}
|
||||
}
|
||||
|
||||
/// Add lanes from data to N states in parallel with stride - optimized version
|
||||
/// Add lanes from data to N states in parallel with stride using SIMD
|
||||
fn addLanesAll(
|
||||
comptime N: usize,
|
||||
states: *[5][5]@Vector(N, u64),
|
||||
|
|
@ -306,7 +306,6 @@ fn addLanesAll(
|
|||
const x = xy % 5;
|
||||
const y = xy / 5;
|
||||
|
||||
// Load N lanes with stride - optimized memory access pattern
|
||||
var loaded_data: @Vector(N, u64) = undefined;
|
||||
inline for (0..N) |i| {
|
||||
loaded_data[i] = load64(data[8 * (i * lane_offset + xy) ..]);
|
||||
|
|
@ -735,6 +734,91 @@ fn ktSingleThreaded(comptime Variant: type, view: *const MultiSliceView, total_l
|
|||
final_state.final(output);
|
||||
}
|
||||
|
||||
/// Context for a thread task that processes leaves using SIMD
|
||||
fn LeafThreadContext(comptime Variant: type) type {
|
||||
return struct {
|
||||
view: *const MultiSliceView,
|
||||
start_offset: usize, // Byte offset in view (after first chunk)
|
||||
num_leaves: usize, // Number of leaves to process
|
||||
output_cvs: []align(@alignOf(u64)) u8, // Where to store CVs
|
||||
|
||||
fn process(ctx: @This()) void {
|
||||
const cv_size = Variant.cv_size;
|
||||
|
||||
// Thread-local scratch buffer for copying data that spans slice boundaries
|
||||
var leaf_buffer: [bytes_per_batch]u8 align(cache_line_size) = undefined;
|
||||
|
||||
var leaves_processed: usize = 0;
|
||||
var byte_offset = ctx.start_offset;
|
||||
var cv_offset: usize = 0;
|
||||
|
||||
// Process leaves in SIMD batches
|
||||
const simd_batch_bytes = optimal_vector_len * chunk_size;
|
||||
while (leaves_processed + optimal_vector_len <= ctx.num_leaves) {
|
||||
if (ctx.view.tryGetSlice(byte_offset, byte_offset + simd_batch_bytes)) |leaf_data| {
|
||||
var leaf_cvs: [optimal_vector_len * Variant.cv_size]u8 = undefined;
|
||||
processLeaves(Variant, optimal_vector_len, leaf_data, &leaf_cvs);
|
||||
@memcpy(ctx.output_cvs[cv_offset..][0..leaf_cvs.len], &leaf_cvs);
|
||||
} else {
|
||||
ctx.view.copyRange(byte_offset, byte_offset + simd_batch_bytes, leaf_buffer[0..simd_batch_bytes]);
|
||||
var leaf_cvs: [optimal_vector_len * Variant.cv_size]u8 = undefined;
|
||||
processLeaves(Variant, optimal_vector_len, leaf_buffer[0..simd_batch_bytes], &leaf_cvs);
|
||||
@memcpy(ctx.output_cvs[cv_offset..][0..leaf_cvs.len], &leaf_cvs);
|
||||
}
|
||||
leaves_processed += optimal_vector_len;
|
||||
byte_offset += optimal_vector_len * chunk_size;
|
||||
cv_offset += optimal_vector_len * cv_size;
|
||||
}
|
||||
|
||||
// Process remaining leaves one at a time (should be less than optimal_vector_len)
|
||||
while (leaves_processed < ctx.num_leaves) {
|
||||
const leaf_end = byte_offset + chunk_size;
|
||||
var cv_buffer: [64]u8 = undefined; // Max CV size is 64 bytes
|
||||
|
||||
if (ctx.view.tryGetSlice(byte_offset, leaf_end)) |leaf_data| {
|
||||
const cv_slice = MultiSliceView.init(leaf_data, &[_]u8{}, &[_]u8{});
|
||||
Variant.turboShakeToBuffer(&cv_slice, 0x0B, cv_buffer[0..cv_size]);
|
||||
} else {
|
||||
ctx.view.copyRange(byte_offset, leaf_end, leaf_buffer[0..chunk_size]);
|
||||
const cv_slice = MultiSliceView.init(leaf_buffer[0..chunk_size], &[_]u8{}, &[_]u8{});
|
||||
Variant.turboShakeToBuffer(&cv_slice, 0x0B, cv_buffer[0..cv_size]);
|
||||
}
|
||||
@memcpy(ctx.output_cvs[cv_offset..][0..cv_size], cv_buffer[0..cv_size]);
|
||||
|
||||
leaves_processed += 1;
|
||||
byte_offset += chunk_size;
|
||||
cv_offset += cv_size;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Context for the final partial leaf (may be smaller than chunk_size)
|
||||
fn FinalLeafContext(comptime Variant: type) type {
|
||||
return struct {
|
||||
view: *const MultiSliceView,
|
||||
start_offset: usize,
|
||||
leaf_len: usize, // May be less than chunk_size
|
||||
output_cv: []align(@alignOf(u64)) u8,
|
||||
|
||||
fn process(ctx: @This()) void {
|
||||
const cv_size = Variant.cv_size;
|
||||
var leaf_buffer: [chunk_size]u8 = undefined;
|
||||
var cv_buffer: [64]u8 = undefined;
|
||||
|
||||
if (ctx.view.tryGetSlice(ctx.start_offset, ctx.start_offset + ctx.leaf_len)) |leaf_data| {
|
||||
const cv_slice = MultiSliceView.init(leaf_data, &[_]u8{}, &[_]u8{});
|
||||
Variant.turboShakeToBuffer(&cv_slice, 0x0B, cv_buffer[0..cv_size]);
|
||||
} else {
|
||||
ctx.view.copyRange(ctx.start_offset, ctx.start_offset + ctx.leaf_len, leaf_buffer[0..ctx.leaf_len]);
|
||||
const cv_slice = MultiSliceView.init(leaf_buffer[0..ctx.leaf_len], &[_]u8{}, &[_]u8{});
|
||||
Variant.turboShakeToBuffer(&cv_slice, 0x0B, cv_buffer[0..cv_size]);
|
||||
}
|
||||
@memcpy(ctx.output_cv[0..cv_size], cv_buffer[0..cv_size]);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Generic multi-threaded implementation
|
||||
fn ktMultiThreaded(
|
||||
comptime Variant: type,
|
||||
|
|
@ -744,76 +828,110 @@ fn ktMultiThreaded(
|
|||
total_len: usize,
|
||||
output: []u8,
|
||||
) !void {
|
||||
comptime std.debug.assert(bytes_per_batch % (optimal_vector_len * chunk_size) == 0);
|
||||
|
||||
const cv_size = Variant.cv_size;
|
||||
|
||||
// Calculate total number of leaves
|
||||
const total_leaves: usize = (total_len - 1) / chunk_size;
|
||||
// Calculate total leaves after the first chunk
|
||||
const remaining_bytes = total_len - chunk_size;
|
||||
const total_leaves = std.math.divCeil(usize, remaining_bytes, chunk_size) catch unreachable;
|
||||
|
||||
// Allocate buffer for all chaining values
|
||||
const cvs = try allocator.alignedAlloc(u8, std.mem.Alignment.of(u64), total_leaves * cv_size);
|
||||
defer allocator.free(cvs);
|
||||
// Pre-compute suffix: right_encode(n) || terminator
|
||||
const n_enc = rightEncode(total_leaves);
|
||||
const terminator = [_]u8{ 0xFF, 0xFF };
|
||||
const padding = [_]u8{ 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
|
||||
|
||||
// Calculate number of threads needed based on batch_count
|
||||
// Each thread processes at most batch_count chunks
|
||||
const num_threads = std.math.divCeil(usize, total_leaves, batch_count) catch unreachable;
|
||||
// Try to get first chunk as contiguous slice to avoid a copy and take advantage of MultiSliceView instead.
|
||||
const first_chunk_direct = view.tryGetSlice(0, chunk_size);
|
||||
|
||||
// Pre-allocate scratch buffers for all threads (8 leaves)
|
||||
const scratch_size = 8 * chunk_size;
|
||||
const all_scratch = try allocator.alloc(u8, num_threads * scratch_size);
|
||||
defer allocator.free(all_scratch);
|
||||
// Calculate buffer size - skip first chunk if we can reference it directly
|
||||
const cvs_len = total_leaves * cv_size;
|
||||
const suffix_len = n_enc.len + terminator.len;
|
||||
const msg_buf_len = if (first_chunk_direct != null)
|
||||
padding.len + cvs_len + suffix_len
|
||||
else
|
||||
chunk_size + padding.len + cvs_len + suffix_len;
|
||||
|
||||
const msg_buf = try allocator.alignedAlloc(u8, std.mem.Alignment.of(u64), msg_buf_len);
|
||||
defer allocator.free(msg_buf);
|
||||
|
||||
// Set up buffer layout based on whether we're using zero-copy for first chunk
|
||||
const cvs: []align(@alignOf(u64)) u8 = if (first_chunk_direct != null) blk: {
|
||||
// Zero-copy layout: padding || cvs || suffix
|
||||
@memcpy(msg_buf[0..padding.len], &padding);
|
||||
@memcpy(msg_buf[padding.len + cvs_len ..][0..n_enc.len], n_enc.slice());
|
||||
@memcpy(msg_buf[padding.len + cvs_len + n_enc.len ..][0..terminator.len], &terminator);
|
||||
break :blk @alignCast(msg_buf[padding.len..][0..cvs_len]);
|
||||
} else blk: {
|
||||
// Fallback layout: first_chunk || padding || cvs || suffix
|
||||
view.copyRange(0, chunk_size, msg_buf[0..chunk_size]);
|
||||
@memcpy(msg_buf[chunk_size..][0..padding.len], &padding);
|
||||
@memcpy(msg_buf[chunk_size + padding.len + cvs_len ..][0..n_enc.len], n_enc.slice());
|
||||
@memcpy(msg_buf[chunk_size + padding.len + cvs_len + n_enc.len ..][0..terminator.len], &terminator);
|
||||
break :blk @alignCast(msg_buf[chunk_size + padding.len ..][0..cvs_len]);
|
||||
};
|
||||
|
||||
// Calculate how many full (complete chunk_size) leaves we have
|
||||
const full_leaves = remaining_bytes / chunk_size;
|
||||
|
||||
// Check if there's a partial final leaf (less than chunk_size)
|
||||
const has_partial_leaf = (remaining_bytes % chunk_size) != 0;
|
||||
const partial_leaf_size = if (has_partial_leaf) remaining_bytes % chunk_size else 0;
|
||||
|
||||
// Number of leaves (chunks) per batch in multi-threaded mode
|
||||
const leaves_per_batch = bytes_per_batch / chunk_size;
|
||||
|
||||
// Calculate number of full thread tasks based on complete leaves only
|
||||
const num_full_tasks = full_leaves / leaves_per_batch;
|
||||
const remaining_full_leaves = full_leaves % leaves_per_batch;
|
||||
|
||||
var group: Io.Group = .init;
|
||||
var leaves_assigned: usize = 0;
|
||||
var thread_index: usize = 0;
|
||||
|
||||
while (leaves_assigned < total_leaves) {
|
||||
const leaves_for_this_batch = @min(batch_count, total_leaves - leaves_assigned);
|
||||
const batch_start = chunk_size + leaves_assigned * chunk_size;
|
||||
const cvs_offset = leaves_assigned * cv_size;
|
||||
// Spawn tasks for full SIMD batches
|
||||
for (0..num_full_tasks) |task_id| {
|
||||
const start_offset = chunk_size + task_id * bytes_per_batch;
|
||||
const cv_start = task_id * leaves_per_batch * cv_size;
|
||||
|
||||
const ctx = LeafBatchContext{
|
||||
.output_cvs = @alignCast(cvs[cvs_offset .. cvs_offset + leaves_for_this_batch * cv_size]),
|
||||
.batch_start = batch_start,
|
||||
.batch_count = leaves_for_this_batch,
|
||||
group.async(io, LeafThreadContext(Variant).process, .{LeafThreadContext(Variant){
|
||||
.view = view,
|
||||
.scratch_buffer = all_scratch[thread_index * scratch_size .. (thread_index + 1) * scratch_size],
|
||||
.total_len = total_len,
|
||||
};
|
||||
|
||||
group.async(io, struct {
|
||||
fn process(c: LeafBatchContext) void {
|
||||
processLeafBatch(Variant, c);
|
||||
}
|
||||
}.process, .{ctx});
|
||||
|
||||
leaves_assigned += leaves_for_this_batch;
|
||||
thread_index += 1;
|
||||
.start_offset = start_offset,
|
||||
.num_leaves = leaves_per_batch,
|
||||
.output_cvs = @alignCast(cvs[cv_start..][0 .. leaves_per_batch * cv_size]),
|
||||
}});
|
||||
}
|
||||
|
||||
// Spawn task for remaining full leaves (if any)
|
||||
if (remaining_full_leaves > 0) {
|
||||
const start_offset = chunk_size + num_full_tasks * bytes_per_batch;
|
||||
const cv_start = num_full_tasks * leaves_per_batch * cv_size;
|
||||
|
||||
group.async(io, LeafThreadContext(Variant).process, .{LeafThreadContext(Variant){
|
||||
.view = view,
|
||||
.start_offset = start_offset,
|
||||
.num_leaves = remaining_full_leaves,
|
||||
.output_cvs = @alignCast(cvs[cv_start..][0 .. remaining_full_leaves * cv_size]),
|
||||
}});
|
||||
}
|
||||
|
||||
// Spawn task for the partial final leaf (if required)
|
||||
if (has_partial_leaf) {
|
||||
const start_offset = chunk_size + full_leaves * chunk_size;
|
||||
const cv_start = full_leaves * cv_size;
|
||||
|
||||
group.async(io, FinalLeafContext(Variant).process, .{FinalLeafContext(Variant){
|
||||
.view = view,
|
||||
.start_offset = start_offset,
|
||||
.leaf_len = partial_leaf_size,
|
||||
.output_cv = @alignCast(cvs[cv_start..][0..cv_size]),
|
||||
}});
|
||||
}
|
||||
|
||||
// Wait for all threads to complete
|
||||
group.wait(io);
|
||||
|
||||
// Build final node
|
||||
const n_enc = rightEncode(total_leaves);
|
||||
const final_node_len = chunk_size + 8 + total_leaves * cv_size + n_enc.len + 2;
|
||||
const final_node = try allocator.alloc(u8, final_node_len);
|
||||
defer allocator.free(final_node);
|
||||
|
||||
// Copy first B bytes
|
||||
if (view.tryGetSlice(0, chunk_size)) |first_chunk| {
|
||||
@memcpy(final_node[0..chunk_size], first_chunk);
|
||||
} else {
|
||||
view.copyRange(0, chunk_size, final_node[0..chunk_size]);
|
||||
}
|
||||
|
||||
@memset(final_node[chunk_size..][0..8], 0);
|
||||
final_node[chunk_size] = 0x03;
|
||||
@memcpy(final_node[chunk_size + 8 ..][0 .. total_leaves * cv_size], cvs);
|
||||
@memcpy(final_node[chunk_size + 8 + total_leaves * cv_size ..][0..n_enc.len], n_enc.slice());
|
||||
final_node[final_node_len - 2] = 0xFF;
|
||||
final_node[final_node_len - 1] = 0xFF;
|
||||
|
||||
const final_view = MultiSliceView.init(final_node, &[_]u8{}, &[_]u8{});
|
||||
const final_view = if (first_chunk_direct) |first_chunk|
|
||||
MultiSliceView.init(first_chunk, msg_buf, &[_]u8{})
|
||||
else
|
||||
MultiSliceView.init(msg_buf, &[_]u8{}, &[_]u8{});
|
||||
Variant.turboShakeToBuffer(&final_view, 0x06, output);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue