I’ll just share a snippet if thats okay.
I don’t actually use two of the buffers from the Bufs struct. I found that just removing those speeds up my program dramatically. I thought it could maybe be due to zig setting the memory to undefined with runtime safety, so I tried to turn off runtime safety for the create/destroy of that structure which didn’t help at all.
I thought that having those buffers there should have near zero impact because it is in memory pool. But it seems that is what is causing my slowdowns.
fn runProxy(proxy_listener: *net.Server, counter: *Counter, gpa: std.mem.Allocator, srv_address: net.UnixAddress) void {
var io_ev_rt = zio.Runtime.init(gpa, .{ .executors = .exact(1), .thread_pool = .{ .max_threads = 0 } }) catch @panic("OOM");
defer io_ev_rt.deinit();
const io_ev = io_ev_rt.io();
var my_srv: MyProxy = .{ .counter = counter, .proxy_listener = proxy_listener, .io = io_ev, .tasks = .init, .srv_address = srv_address, .pool = .empty, .gpa = gpa };
while (true) {
my_srv.accept();
}
}
const MyProxy = struct {
const buf_size = 1024 * 32;
const Bufs = struct {
// If i delete the unused buffers from this struct, I get huge speedup in debug and ReleaseSafe
cl_read: [buf_size]u8,
srv_read: [buf_size]u8,
cl_write: [buf_size]u8,
srv_write: [buf_size]u8,
};
const BufsPool = std.heap.MemoryPool(Bufs);
io: Io,
proxy_listener: *net.Server,
counter: *Counter,
tasks: Io.Group,
srv_address: net.UnixAddress,
pool: BufsPool,
gpa: std.mem.Allocator,
fn accept(self: *MyProxy) void {
const conn = self.proxy_listener.accept(self.io) catch |err| {
log.warn("Accept err {t}", .{err});
return;
};
self.tasks.async(self.io, handleConnection, .{ self, conn });
}
fn handleConnection(self: *MyProxy, cl_connection: net.Stream) void {
defer cl_connection.close(self.io);
var srv_connection = self.srv_address.connect(self.io) catch |err| {
log.err("Connect error {t}", .{err});
return;
};
defer srv_connection.close(self.io);
var bufs = self.pool.create(self.gpa) catch @panic("OOM");
defer self.pool.destroy(bufs);
var cl_reader = cl_connection.reader(self.io, &.{});
var cl_writer = cl_connection.writer(self.io, &bufs.cl_write);
var srv_reader = srv_connection.reader(self.io, &.{});
var srv_writer = srv_connection.writer(self.io, &bufs.srv_write);
var fut = self.io.concurrent(streamUntilClose, .{ "srv->cl", &srv_reader.interface, &cl_writer.interface }) catch @panic("NO");
streamUntilClose("cl->srv", &cl_reader.interface, &srv_writer.interface);
cl_connection.shutdown(self.io, .recv) catch return;
srv_connection.shutdown(self.io, .send) catch return;
fut.await(self.io);
}
fn streamUntilClose(name: []const u8, reader: *std.Io.Reader, writer: *std.Io.Writer) void {
while (true) {
_ = reader.stream(writer, .unlimited) catch |err| switch (err) {
error.EndOfStream => {
log.debug("Stream {s} done", .{name});
return;
},
else => {
log.err("Stream err {t}", .{err});
return;
},
};
writer.flush() catch |err| {
log.err("Flush err {t}", .{err});
return;
};
}
}
};