Good Evening,
After trying to resolve this myself I’m quite stumped, I’m running into an issue where I receive: “group task acknowledged cancelation but did not return error.Canceled” the relevant code blocks are below. I’ve tried to do my due diligence and search through other threads about cancellation, but I am just not understanding what I should be doing in this given instance. Thank you to everyone in advance.
pub fn WaylandProtocolService(comptime config: WaylandProtocolServiceConfig) type {
return struct {
const Instance = struct {
reader: *Reader,
allocator: Allocator,
initialized: bool,
const InitConfig = struct {
reader: *Reader,
allocator: Allocator,
};
pub fn init(initConfig: InitConfig) @This() {
return .{ .reader = initConfig.reader, .allocator = initConfig.allocator, .initialized = true };
}
pub fn deinit(this: *@This()) !void {
this.initialized = false;
}
//pub const WaylandProtocolServiceListenErrors = error{ ReadFailed, EndOfStream, ConcurrencyUnavailable };
pub fn listen(this: *@This(), allocator: Allocator, io: Io, appId: usize) !void {
l: while (true) {
try Io.checkCancel(io);
const x = this.reader.peek(8) catch break :l;
var header: WaylandHeader = undefined;
@memcpy(asBytes(&header), x[0..@sizeOf(WaylandHeader)]);
const y = this.reader.take(@as(usize, header.size)) catch break :l;
const z = WaylandMessage.deserialize(allocator, y) catch break :l;
config.dispatch(.{ .waylandMessage = z, .appId = appId });
allocator.destroy(z);
}
try this.deinit();
}
};
//fields
group: Io.Group,
instances: [config.maxConnections]Instance,
pos: usize,
//public functions
pub fn init() @This() {
var x: @This() = .{ .group = Io.Group.init, .instances = undefined, .pos = 0 };
for (&x.instances) |*y| {
y.initialized = false;
}
return x;
}
pub fn deinit(this: *@This()) !void {
try this.cancel();
}
pub fn listen(this: *@This(), allocator: Allocator, io: Io, reader: *Reader) !*Instance {
try this.seek();
this.instances[this.pos] = Instance.init(.{ .allocator = allocator, .reader = reader });
_ = try this.group.concurrent(io, Instance.listen, .{ &this.instances[this.pos], allocator, io, this.pos });
return &this.instances[this.pos];
}
pub fn cancel(this: *@This(), io: Io) !void {
this.group.cancel(io);
for (&this.instances) |*instance| {
if (instance.initialized) {
try instance.deinit();
}
}
}
pub const FindIndexFromPointerErrors = error{OutOfBounds};
pub fn findIndexFromPointer(self: *@This(), element: *Instance) FindIndexFromPointerErrors!usize {
const result = @intFromPtr(element) -% @intFromPtr(&self.instances);
if (result > self.instances.len) {
return FindIndexFromPointerErrors.OutOfBounds;
}
return result;
}
//private functions
const WaylandProtocolServiceSeekErrors = error{CapacityReached};
fn seek(this: *@This()) !void {
var count: usize = 0;
while (count < this.instances.len) {
if (this.pos == this.instances.len) {
this.pos = 0;
}
if (!this.instances[this.pos].initialized) {
return;
}
count += 1;
this.pos += 1;
}
return error.CapacityReached;
}
};
}
//specifically these functions:
pub fn listen(this: *@This(), allocator: Allocator, io: Io, reader: *Reader) !*Instance {
try this.seek();
this.instances[this.pos] = Instance.init(.{ .allocator = allocator, .reader = reader });
_ = try this.group.concurrent(io, Instance.listen, .{ &this.instances[this.pos], allocator, io, this.pos });
return &this.instances[this.pos];
}
pub fn listen(this: *@This(), allocator: Allocator, io: Io, appId: usize) !void {
l: while (true) {
try Io.checkCancel(io);
const x = this.reader.peek(8) catch break :l;
var header: WaylandHeader = undefined;
@memcpy(asBytes(&header), x[0..@sizeOf(WaylandHeader)]);
const y = this.reader.take(@as(usize, header.size)) catch break :l;
const z = WaylandMessage.deserialize(allocator, y) catch break :l;
config.dispatch(.{ .waylandMessage = z, .appId = appId });
allocator.destroy(z);
}
try this.deinit();
}
//which are called as follows:
const ProtoServ = Wayland.WaylandProtocolService(.{ .maxConnections = 1024, .dispatch = testDispatch });
var protoServ: ProtoServ = .init();
_ = try protoServ.listen(sta, io, x.readerInterface());
try std.Io.sleep(io, .fromMilliseconds(1000), .real);
protoServ.group.cancel(io);
//and results in the following errors:
/nix/store/3fd6p93bxsdaz9zlcn9c6zmny5a2i6nb-zig-0.16.0-dev.2368+380ea6fb5/lib/std/debug.zig:419:14: 0x1032679 in assert (std.zig)
if (!ok) unreachable; // assertion failure
^
/nix/store/3fd6p93bxsdaz9zlcn9c6zmny5a2i6nb-zig-0.16.0-dev.2368+380ea6fb5/lib/std/Io/Threaded.zig:347:23: 0x108c826 in start (std.zig)
assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
^
/nix/store/3fd6p93bxsdaz9zlcn9c6zmny5a2i6nb-zig-0.16.0-dev.2368+380ea6fb5/lib/std/Io/Threaded.zig:1550:29: 0x10dfb9a in worker (std.zig)
runnable.startFn(runnable, &thread, t);
^
/nix/store/3fd6p93bxsdaz9zlcn9c6zmny5a2i6nb-zig-0.16.0-dev.2368+380ea6fb5/lib/std/Thread.zig:561:13: 0x10b83a5 in callFn__anon_15363 (std.zig)
@call(.auto, f, args);
^
/nix/store/3fd6p93bxsdaz9zlcn9c6zmny5a2i6nb-zig-0.16.0-dev.2368+380ea6fb5/lib/std/Thread.zig:1506:30: 0x108b3c0 in entryFn (std.zig)
return callFn(f, self.fn_args);
^
/nix/store/3fd6p93bxsdaz9zlcn9c6zmny5a2i6nb-zig-0.16.0-dev.2368+380ea6fb5/lib/std/os/linux/x86_64.zig:105:5: 0x10b8485 in clone (std.zig)
asm volatile (
^