Tcp thread pool issue

hi,
to teach myself am writing a simple tcp server which is using a thread pool to handle the stream read.

const std = @import("std");

pub fn main() !void {
    var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
    defer arena.deinit();
    const allocator = arena.allocator();
    var thread_io = std.Io.Threaded.init(allocator);
    defer thread_io.deinit();
    // const opts = std.Io.net.IpAddress.ListenOptions{
    //     .mode = .stream,
    //     .protocol = .tcp,
    //     .reuse_address = true,
    // };
    // const address = try std.Io.net.IpAddress.parse("0.0.0.0", 32100);
    // const buffer = try arena.allocator().create([]u8);
    // defer arena.allocator().destroy(buffer);
    // try start_server(address, thread_io.io(), opts, buffer);
}

var thread_pool: std.Thread.Pool = undefined;
var gpa = std.heap.GeneralPurposeAllocator(.{}).init;
const pool_opts: std.Thread.Pool.Options = .{ .allocator = gpa.allocator(), .n_jobs = 4 };

fn start_server(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ListenOptions) !void {
    try thread_pool.init(pool_opts);
    defer thread_pool.deinit();

    // while (true) {
    std.debug.print("starting server...\n", .{});
    var stream = try address.listen(io, opts);
    defer stream.deinit(io);
    const client = try stream.accept(io);
    defer client.close(io);
    try thread_pool.spawn(stream_data, .{ client, io });
}

fn stream_data(client: std.Io.net.Stream, io: std.Io) void {
    // while (true) {
    std.debug.print("reading data from server\n", .{});
    var buf: [127]u8 = undefined;
    var client_reader = client.reader(io, &buf);
    const data = client_reader.interface.takeDelimiterInclusive('\n') catch |e| {
        std.debug.print("err:{s}\n", .{@errorName(e)});
        return;
    };
    if (data.len > 0) {
        std.debug.print("data:{s}\n", .{data});
    }
    // }
}

test "stream" {
    const address = try std.Io.net.IpAddress.parse("0.0.0.0", 32100);
    const listen_opts = std.Io.net.IpAddress.ListenOptions{
        .mode = .stream,
        .protocol = .tcp,
        .reuse_address = true,
    };
    var gp_alloc = std.heap.DebugAllocator(.{}).init;
    defer {
        _ = gp_alloc.deinit();
    }
    var thread_io = std.Io.Threaded.init(gp_alloc.allocator());
    defer thread_io.deinit();
    const opts = std.Io.net.IpAddress.ConnectOptions{
        .mode = .stream,
        .protocol = .tcp,
    };
    var server_thread = try std.Thread.spawn(.{}, start_server, .{ address, thread_io.io(), listen_opts });
    server_thread.detach();
    try std.Io.sleep(std.testing.io, std.Io.Duration{ .nanoseconds = 10000 }, std.Io.Clock.awake);
    var client_write_thread = try std.Thread.spawn(.{}, write_data_to_stream, .{ address, thread_io.io(), opts });
    client_write_thread.join();
}

fn write_data_to_stream(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ConnectOptions) !void {
    // while (true) {
    std.debug.print("starting client\n", .{});
    const client_stream = address.connect(io, opts) catch |e| {
        std.debug.print("error connecting to server:{s}\n", .{@errorName(e)});
        return;
        // continue;
    };
    defer client_stream.close(io);
    var buffer: [124]u8 = undefined;
    var client_writer = client_stream.writer(io, &buffer);
    try client_writer.interface.writeAll("some data from client\n");
    try client_writer.interface.flush();
    std.debug.print("sent data...\n", .{});
    // }
}

to test the implementation i have unit test stream in the file.
to give room for server to start and initialize i have followed the approach as below

    var server_thread = try std.Thread.spawn(.{}, start_server, .{ address, thread_io.io(), listen_opts });
    server_thread.detach();
    try std.Io.sleep(std.testing.io, std.Io.Duration{ .nanoseconds = 10000 }, std.Io.Clock.awake);
    var client_write_thread = try std.Thread.spawn(.{}, write_data_to_stream, .{ address, thread_io.io(), opts });
    client_write_thread.join();
  • not to block client write thread with server as the server is a blocking IO operation.
    inside the start_server i have a thread pool initialized and each stream is handled by a thread on its own.
    When i execute the zig test src/main.zig i get the following error
starting client
starting server...
sent data...
reading data from server
thread 7903 panic: programmer bug caused syscall error: BADF
/home/karthick/.local/zig/lib/std/Io/Threaded.zig:5191:34: 0x104396a in errnoBug (std.zig)
    if (is_debug) std.debug.panic("programmer bug caused syscall error: {t}", .{err});
                                 ^
/home/karthick/.local/zig/lib/std/Io/Threaded.zig:4008:43: 0x10286ea in netReadPosix (std.zig)
            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
                                          ^
/home/karthick/.local/zig/lib/std/Io/net.zig:1238:40: 0x1114db7 in readVec (std.zig)
            const n = io.vtable.netRead(io.userdata, r.stream.socket.handle, dest) catch |err| {
                                       ^
/home/karthick/.local/zig/lib/std/Io/Reader.zig:1116:29: 0x10c6922 in fillMore (std.zig)
    _ = try r.vtable.readVec(r, &bufs);
                            ^
/home/karthick/.local/zig/lib/std/Io/Reader.zig:812:21: 0x10a24f9 in peekDelimiterInclusive (std.zig)
        try fillMore(r);
                    ^
/home/karthick/.local/zig/lib/std/Io/Reader.zig:784:48: 0x10c2db6 in takeDelimiterInclusive (std.zig)
    const result = try r.peekDelimiterInclusive(delimiter);
                                               ^
/home/karthick/projects/zig-tcp/src/main.zig:42:64: 0x109d3c4 in stream_data (main.zig)
    const data = client_reader.interface.takeDelimiterInclusive('\n') catch |e| {
                                                               ^
/home/karthick/.local/zig/lib/std/Thread/Pool.zig:230:39: 0x1070abd in runFn (std.zig)
            @call(.auto, func, closure.arguments);
                                      ^
/home/karthick/.local/zig/lib/std/Thread/Pool.zig:293:27: 0x10ed910 in worker (std.zig)
            runnable.runFn(runnable, id);
                          ^

what i understand from the message is // File descriptor used after closed.
can i know where am i closing the file descriptor stupidly coz all my closes are using defer.
thanks in advance for help and time teaching me.

Thread.Pool has been removed. Could you upgrade to latest master, and then post if there are still issues?

Release notes: https://codeberg.org/ziglang/zig/pulls/30557#issuecomment-9143961

Thanks will update and post outcome in the thread

no luck is failing to initialize threaded.init now

var gpa = std.heap.GeneralPurposeAllocator(.{}).init;
var group: std.Io.Group = .init;
const init_option: std.Io.Threaded.InitOptions = .{};
var thread_io = std.Io.Threaded.init(gpa.allocator(), init_option);

fn start_server(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ListenOptions) !void {
    defer {
        thread_io.deinit();
        _ = gpa.deinit();
    }

    // while (true) {
    std.debug.print("starting server...\n", .{});
    var stream = try address.listen(io, opts);
    defer stream.deinit(io);
    const client = try stream.accept(io);
    defer client.close(io);
    group.async(io, stream_data, .{ client, io });
}

fn stream_data(client: std.Io.net.Stream, io: std.Io) void {
    // while (true) {
    std.debug.print("reading data from server\n", .{});
    var buf: [127]u8 = undefined;
    var client_reader = client.reader(io, &buf);
    const data = client_reader.interface.takeDelimiterInclusive('\n') catch unreachable;
    if (data.len > 0) {
        std.debug.print("data:{s}\n", .{data});
    }
}

test "stream" {
    const address = try std.Io.net.IpAddress.parse("0.0.0.0", 32100);
    const listen_opts = std.Io.net.IpAddress.ListenOptions{
        .mode = .stream,
        .protocol = .tcp,
        .reuse_address = true,
    };
    const opts = std.Io.net.IpAddress.ConnectOptions{
        .mode = .stream,
        .protocol = .tcp,
    };
    _ = std.Io.async(std.testing.io, start_server, .{ address, std.testing.io, listen_opts });
    try std.Io.sleep(std.testing.io, std.Io.Duration{ .nanoseconds = 100000 }, std.Io.Clock.awake);
    var client_async = std.Io.async(std.testing.io, write_data_to_stream, .{ address, std.testing.io, opts });
    try client_async.await(std.testing.io);
}

fn write_data_to_stream(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ConnectOptions) !void {
    // while (true) {
    std.debug.print("starting client\n", .{});
    const client_stream = address.connect(io, opts) catch |e| {
        std.debug.print("error connecting to server:{s}\n", .{@errorName(e)});
        return;
        // continue;
    };
    defer client_stream.close(io);
    var buffer: [124]u8 = undefined;
    var client_writer = client_stream.writer(io, &buffer);
    try client_writer.interface.writeAll("some data from client\n");
    try client_writer.interface.flush();
    std.debug.print("sent data...\n", .{});
    // }
}

home/karthick/.local/zig/lib/std/Io/Threaded.zig:1158:45: note: called at comptime from here const cpu_count = std.Thread.getCpuCount();
trying to get the cpu count and failing.

You should init Threaded in a function, the error message is a hint that you’re in comptime context.

The group.async call must be await’ed in your example, otherwise you’ll hit your original problem (as the connection close)

The _ = std.Io.async call would at a minimum leak, you must cancel or await the future.

I’d recommend getting your test working first with std.testing.io and then tackle passing around your own io instance.

As std.Io on master is a quickly moving target, I think you’d benefit from looking at the test code that ships with the compiler commit you’re working on.

Thanks my approach is also same get test working with testing.io.
I have a query on the await for the group.
My understanding is await means you wait till the operation is complete. When the start_server calls await will it not block the server handling other incoming request?

yeah this was just to get your basic test working - a real server handling many clients will call accept in a loop and this particular group await probably goes away since your program is then not exiting due to the accept loop.

client.close() goes out of scope before it’s used in stream_data()

1 Like
fn start_server(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ListenOptions) !void {
    while (true) {
        std.debug.print("starting server...\n", .{});
        var stream = try address.listen(io, opts);
        defer stream.deinit(io);
        const client = try stream.accept(io);
        group.async(io, stream_data, .{ client, io });
        group.await(io) catch unreachable;
        defer {
            _ = gpa.deinit();
            client.close(io);
        }
    }
}

updated as per the suggestion. The test is working as expected.
Still have issue if i disable the group.await(io) catch unreachable;. It gives the same error as original issue. As mentioned by grayhatter i have updated the defer client.close(io); after the stream_data call to read the data.
I will check unit test in the std.Io.Threaded

Yes, this is why the group await was suggested in the first place, to avoid the premature connection closure. For a real server, you’ll solve this by continuously accepting connections and only close the connection when you’re done (the timing of which is application protocol specific - sometimes it’s a long running connection)

thank you @cryptocode and @grayhatter i did update as per suggestion below is working copy of the server

fn start_server(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ListenOptions) !void {
    var client: std.Io.net.Stream = undefined;
    defer {
        client.close(io);
        _ = gpa.deinit();
    }
    while (true) {
        std.debug.print("starting server...\n", .{});
        var stream = try address.listen(io, opts);
        defer stream.deinit(io);
        client = try stream.accept(io);
        group.async(io, stream_data, .{ client, io });
    }
}

the server is working as expected.

zig test --test-filter stream  src/main.zig                                                                                                                           
starting server...
starting client
sent data...
starting server...
reading data from server
data:some data from client
1 Like

You probably want to use group.concurrent in there, unless you plan to only serve one client at a time.

Getting closer, though you shouldn’t call listen inside the loop since it returns a Server from which you can accept many client connections.

thanks will update and post the updated version

fn start_server(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ListenOptions) !void {
    var client: std.Io.net.Stream = undefined;
    var stream = try address.listen(io, opts);
    defer {
        client.close(io);
        _ = gpa.deinit();
        stream.deinit(io);
    }
    while (true) {
        std.debug.print("starting server...\n", .{});
        client = try stream.accept(io);
        try group.concurrent(io, stream_data, .{ client, io });
    }
}