mirror of
https://codeberg.org/ziglang/zig.git
synced 2025-12-06 05:44:20 +00:00
Io: update for new linked list API
This commit is contained in:
parent
4b657d2de5
commit
e1cbcecf89
2 changed files with 43 additions and 46 deletions
|
|
@ -917,17 +917,19 @@ pub const TypeErasedQueue = struct {
|
|||
put_index: usize,
|
||||
get_index: usize,
|
||||
|
||||
putters: std.DoublyLinkedList(PutNode),
|
||||
getters: std.DoublyLinkedList(GetNode),
|
||||
putters: std.DoublyLinkedList,
|
||||
getters: std.DoublyLinkedList,
|
||||
|
||||
const PutNode = struct {
|
||||
const Put = struct {
|
||||
remaining: []const u8,
|
||||
condition: Condition,
|
||||
node: std.DoublyLinkedList.Node,
|
||||
};
|
||||
|
||||
const GetNode = struct {
|
||||
const Get = struct {
|
||||
remaining: []u8,
|
||||
condition: Condition,
|
||||
node: std.DoublyLinkedList.Node,
|
||||
};
|
||||
|
||||
pub fn init(buffer: []u8) TypeErasedQueue {
|
||||
|
|
@ -952,16 +954,16 @@ pub const TypeErasedQueue = struct {
|
|||
|
||||
var remaining = elements;
|
||||
while (true) {
|
||||
const getter = q.getters.popFirst() orelse break;
|
||||
const copy_len = @min(getter.data.remaining.len, remaining.len);
|
||||
@memcpy(getter.data.remaining[0..copy_len], remaining[0..copy_len]);
|
||||
const getter: *Get = @fieldParentPtr("node", q.getters.popFirst() orelse break);
|
||||
const copy_len = @min(getter.remaining.len, remaining.len);
|
||||
@memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]);
|
||||
remaining = remaining[copy_len..];
|
||||
getter.data.remaining = getter.data.remaining[copy_len..];
|
||||
if (getter.data.remaining.len == 0) {
|
||||
getter.data.condition.signal(io);
|
||||
getter.remaining = getter.remaining[copy_len..];
|
||||
if (getter.remaining.len == 0) {
|
||||
getter.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
q.getters.prepend(getter);
|
||||
q.getters.prepend(&getter.node);
|
||||
assert(remaining.len == 0);
|
||||
return elements.len;
|
||||
}
|
||||
|
|
@ -987,12 +989,10 @@ pub const TypeErasedQueue = struct {
|
|||
const total_filled = elements.len - remaining.len;
|
||||
if (total_filled >= min) return total_filled;
|
||||
|
||||
var node: std.DoublyLinkedList(PutNode).Node = .{
|
||||
.data = .{ .remaining = remaining, .condition = .{} },
|
||||
};
|
||||
q.putters.append(&node);
|
||||
try node.data.condition.wait(io, &q.mutex);
|
||||
remaining = node.data.remaining;
|
||||
var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} };
|
||||
q.putters.append(&pending.node);
|
||||
try pending.condition.wait(io, &q.mutex);
|
||||
remaining = pending.remaining;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1035,16 +1035,16 @@ pub const TypeErasedQueue = struct {
|
|||
}
|
||||
// Copy directly from putters into buffer.
|
||||
while (remaining.len > 0) {
|
||||
const putter = q.putters.popFirst() orelse break;
|
||||
const copy_len = @min(putter.data.remaining.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], putter.data.remaining[0..copy_len]);
|
||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||
const putter: *Put = @fieldParentPtr("node", q.putters.popFirst() orelse break);
|
||||
const copy_len = @min(putter.remaining.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]);
|
||||
putter.remaining = putter.remaining[copy_len..];
|
||||
remaining = remaining[copy_len..];
|
||||
if (putter.data.remaining.len == 0) {
|
||||
putter.data.condition.signal(io);
|
||||
if (putter.remaining.len == 0) {
|
||||
putter.condition.signal(io);
|
||||
} else {
|
||||
assert(remaining.len == 0);
|
||||
q.putters.prepend(putter);
|
||||
q.putters.prepend(&putter.node);
|
||||
return fillRingBufferFromPutters(q, io, buffer.len);
|
||||
}
|
||||
}
|
||||
|
|
@ -1052,12 +1052,10 @@ pub const TypeErasedQueue = struct {
|
|||
const total_filled = buffer.len - remaining.len;
|
||||
if (total_filled >= min) return total_filled;
|
||||
|
||||
var node: std.DoublyLinkedList(GetNode).Node = .{
|
||||
.data = .{ .remaining = remaining, .condition = .{} },
|
||||
};
|
||||
q.getters.append(&node);
|
||||
try node.data.condition.wait(io, &q.mutex);
|
||||
remaining = node.data.remaining;
|
||||
var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
|
||||
q.getters.append(&pending.node);
|
||||
try pending.condition.wait(io, &q.mutex);
|
||||
remaining = pending.remaining;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1067,26 +1065,26 @@ pub const TypeErasedQueue = struct {
|
|||
/// buffers been fully copied.
|
||||
fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io, len: usize) usize {
|
||||
while (true) {
|
||||
const putter = q.putters.popFirst() orelse return len;
|
||||
const putter: *Put = @fieldParentPtr("node", q.putters.popFirst() orelse return len);
|
||||
const available = q.buffer[q.put_index..];
|
||||
const copy_len = @min(available.len, putter.data.remaining.len);
|
||||
@memcpy(available[0..copy_len], putter.data.remaining[0..copy_len]);
|
||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||
const copy_len = @min(available.len, putter.remaining.len);
|
||||
@memcpy(available[0..copy_len], putter.remaining[0..copy_len]);
|
||||
putter.remaining = putter.remaining[copy_len..];
|
||||
q.put_index += copy_len;
|
||||
if (putter.data.remaining.len == 0) {
|
||||
putter.data.condition.signal(io);
|
||||
if (putter.remaining.len == 0) {
|
||||
putter.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
const second_available = q.buffer[0..q.get_index];
|
||||
const second_copy_len = @min(second_available.len, putter.data.remaining.len);
|
||||
@memcpy(second_available[0..second_copy_len], putter.data.remaining[0..second_copy_len]);
|
||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||
const second_copy_len = @min(second_available.len, putter.remaining.len);
|
||||
@memcpy(second_available[0..second_copy_len], putter.remaining[0..second_copy_len]);
|
||||
putter.remaining = putter.remaining[copy_len..];
|
||||
q.put_index = copy_len;
|
||||
if (putter.data.remaining.len == 0) {
|
||||
putter.data.condition.signal(io);
|
||||
if (putter.remaining.len == 0) {
|
||||
putter.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
q.putters.prepend(putter);
|
||||
q.putters.prepend(&putter.node);
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fib
|
|||
threads: Thread.List,
|
||||
detached: struct {
|
||||
mutex: std.Io.Mutex,
|
||||
list: std.DoublyLinkedList(void),
|
||||
list: std.DoublyLinkedList,
|
||||
},
|
||||
|
||||
/// Empirically saw >128KB being used by the self-hosted backend to panic.
|
||||
|
|
@ -226,7 +226,6 @@ pub fn deinit(el: *EventLoop) void {
|
|||
detached.detached_queue_node = .{
|
||||
.prev = &detached.detached_queue_node,
|
||||
.next = &detached.detached_queue_node,
|
||||
.data = {},
|
||||
};
|
||||
break :detached_future @ptrCast(detached.fiber);
|
||||
}, &.{}, .@"1");
|
||||
|
|
@ -753,7 +752,7 @@ const DetachedClosure = struct {
|
|||
event_loop: *EventLoop,
|
||||
fiber: *Fiber,
|
||||
start: *const fn (context: *const anyopaque) void,
|
||||
detached_queue_node: std.DoublyLinkedList(void).Node,
|
||||
detached_queue_node: std.DoublyLinkedList.Node,
|
||||
|
||||
fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
|
||||
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
|
||||
|
|
@ -818,7 +817,7 @@ fn go(
|
|||
.event_loop = event_loop,
|
||||
.fiber = fiber,
|
||||
.start = start,
|
||||
.detached_queue_node = .{ .data = {} },
|
||||
.detached_queue_node = .{},
|
||||
};
|
||||
{
|
||||
event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue