hi,
to teach myself am writing a simple tcp server which is using a thread pool to handle the stream read.
const std = @import("std");
pub fn main() !void {
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
defer arena.deinit();
const allocator = arena.allocator();
var thread_io = std.Io.Threaded.init(allocator);
defer thread_io.deinit();
// const opts = std.Io.net.IpAddress.ListenOptions{
// .mode = .stream,
// .protocol = .tcp,
// .reuse_address = true,
// };
// const address = try std.Io.net.IpAddress.parse("0.0.0.0", 32100);
// const buffer = try arena.allocator().create([]u8);
// defer arena.allocator().destroy(buffer);
// try start_server(address, thread_io.io(), opts, buffer);
}
var thread_pool: std.Thread.Pool = undefined;
var gpa = std.heap.GeneralPurposeAllocator(.{}).init;
const pool_opts: std.Thread.Pool.Options = .{ .allocator = gpa.allocator(), .n_jobs = 4 };
fn start_server(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ListenOptions) !void {
try thread_pool.init(pool_opts);
defer thread_pool.deinit();
// while (true) {
std.debug.print("starting server...\n", .{});
var stream = try address.listen(io, opts);
defer stream.deinit(io);
const client = try stream.accept(io);
defer client.close(io);
try thread_pool.spawn(stream_data, .{ client, io });
}
fn stream_data(client: std.Io.net.Stream, io: std.Io) void {
// while (true) {
std.debug.print("reading data from server\n", .{});
var buf: [127]u8 = undefined;
var client_reader = client.reader(io, &buf);
const data = client_reader.interface.takeDelimiterInclusive('\n') catch |e| {
std.debug.print("err:{s}\n", .{@errorName(e)});
return;
};
if (data.len > 0) {
std.debug.print("data:{s}\n", .{data});
}
// }
}
test "stream" {
const address = try std.Io.net.IpAddress.parse("0.0.0.0", 32100);
const listen_opts = std.Io.net.IpAddress.ListenOptions{
.mode = .stream,
.protocol = .tcp,
.reuse_address = true,
};
var gp_alloc = std.heap.DebugAllocator(.{}).init;
defer {
_ = gp_alloc.deinit();
}
var thread_io = std.Io.Threaded.init(gp_alloc.allocator());
defer thread_io.deinit();
const opts = std.Io.net.IpAddress.ConnectOptions{
.mode = .stream,
.protocol = .tcp,
};
var server_thread = try std.Thread.spawn(.{}, start_server, .{ address, thread_io.io(), listen_opts });
server_thread.detach();
try std.Io.sleep(std.testing.io, std.Io.Duration{ .nanoseconds = 10000 }, std.Io.Clock.awake);
var client_write_thread = try std.Thread.spawn(.{}, write_data_to_stream, .{ address, thread_io.io(), opts });
client_write_thread.join();
}
fn write_data_to_stream(address: std.Io.net.IpAddress, io: std.Io, opts: std.Io.net.IpAddress.ConnectOptions) !void {
// while (true) {
std.debug.print("starting client\n", .{});
const client_stream = address.connect(io, opts) catch |e| {
std.debug.print("error connecting to server:{s}\n", .{@errorName(e)});
return;
// continue;
};
defer client_stream.close(io);
var buffer: [124]u8 = undefined;
var client_writer = client_stream.writer(io, &buffer);
try client_writer.interface.writeAll("some data from client\n");
try client_writer.interface.flush();
std.debug.print("sent data...\n", .{});
// }
}
to test the implementation i have unit test stream in the file.
to give room for server to start and initialize i have followed the approach as below
var server_thread = try std.Thread.spawn(.{}, start_server, .{ address, thread_io.io(), listen_opts });
server_thread.detach();
try std.Io.sleep(std.testing.io, std.Io.Duration{ .nanoseconds = 10000 }, std.Io.Clock.awake);
var client_write_thread = try std.Thread.spawn(.{}, write_data_to_stream, .{ address, thread_io.io(), opts });
client_write_thread.join();
- not to block client write thread with server as the server is a blocking IO operation.
inside the start_server i have a thread pool initialized and each stream is handled by a thread on its own.
When i execute thezig test src/main.zigi get the following error
starting client
starting server...
sent data...
reading data from server
thread 7903 panic: programmer bug caused syscall error: BADF
/home/karthick/.local/zig/lib/std/Io/Threaded.zig:5191:34: 0x104396a in errnoBug (std.zig)
if (is_debug) std.debug.panic("programmer bug caused syscall error: {t}", .{err});
^
/home/karthick/.local/zig/lib/std/Io/Threaded.zig:4008:43: 0x10286ea in netReadPosix (std.zig)
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
^
/home/karthick/.local/zig/lib/std/Io/net.zig:1238:40: 0x1114db7 in readVec (std.zig)
const n = io.vtable.netRead(io.userdata, r.stream.socket.handle, dest) catch |err| {
^
/home/karthick/.local/zig/lib/std/Io/Reader.zig:1116:29: 0x10c6922 in fillMore (std.zig)
_ = try r.vtable.readVec(r, &bufs);
^
/home/karthick/.local/zig/lib/std/Io/Reader.zig:812:21: 0x10a24f9 in peekDelimiterInclusive (std.zig)
try fillMore(r);
^
/home/karthick/.local/zig/lib/std/Io/Reader.zig:784:48: 0x10c2db6 in takeDelimiterInclusive (std.zig)
const result = try r.peekDelimiterInclusive(delimiter);
^
/home/karthick/projects/zig-tcp/src/main.zig:42:64: 0x109d3c4 in stream_data (main.zig)
const data = client_reader.interface.takeDelimiterInclusive('\n') catch |e| {
^
/home/karthick/.local/zig/lib/std/Thread/Pool.zig:230:39: 0x1070abd in runFn (std.zig)
@call(.auto, func, closure.arguments);
^
/home/karthick/.local/zig/lib/std/Thread/Pool.zig:293:27: 0x10ed910 in worker (std.zig)
runnable.runFn(runnable, id);
^
what i understand from the message is // File descriptor used after closed.
can i know where am i closing the file descriptor stupidly coz all my closes are using defer.
thanks in advance for help and time teaching me.