How to read from a file and perform a concurrent task

I’m getting started with async IO and trying to use it in a project. This project works in the following manner:

There’s a file hosts.txt which contains one IPv4 address on each line and looks like:

1.2.3.4
5.6.7.8
9.10.11.12

There’s a method scanAll, which reads the hosts.txt file and calls scanHost, by passing the IP and a port (port is the same for all IPs).

Without async, the code looks like:

const PortStatus = enum {
    open,
    closed,
    unknown,
};

fn scan(io: std.Io, ipv4: []const u8, port: u16) !PortStatus {
    const address = try std.Io.net.IpAddress.parseIp4(ipv4, port);

    const stream = std.Io.net.IpAddress.connect(
        address,
        io,
        .{ .mode = .stream, .protocol = .tcp },
    ) catch |err| {
        return switch (err) {
            error.ConnectionRefused => .closed,
            else => .unknown,
        };
    };

    defer stream.close(io);
    return .open;
}

// scanning one host on one port is a supported functionality; this is not merely a helper function
pub fn scanHost(io: std.Io, ipv4: []const u8, port: u16) !void {
    const port_status = try scan(io, ipv4, port);

    if (port_status == .open) {
        std.debug.print("Port {d} is open on host {s}\n", .{ port, ipv4 });
    } else {
        std.debug.print("Port {d} is not open on host {s}\n", .{ port, ipv4 });
    }
}

pub fn scanAll(io: std.Io, port: u16) !void {
    const input = try std.Io.Dir.cwd().openFile(io, "./hosts.txt", .{ .mode = .read_only });
    defer input.close(io);

    var buffer: [512]u8 = undefined;
    var reader = input.reader(io, &buffer);
    const file = &reader.interface;

    while (try file.takeDelimiter('\n')) |line| {
        try scanHost(io, line, port);
    }
}

and outputs the below (the sequence is the same as in the hosts.txt file):

Port 80 is open on host 1.2.3.4
Port 80 is open on host 5.6.7.8
Port 80 is open on host 9.10.11.12

If the above output could be in any order, how can I use concurrency (and / or parallelism) ?

Where specifically are you looking to use async in the provided example?

Within the scanAll function

edit: or more specifically, within the while loop of the scanAll function is where I think using concurrency could be possible.

one way to try would be to add an io parameter to that function and replace the call in the body of that loop with something like a group.async(…) that you await at the end of the loop

I tried:

pub fn scanAll(io: std.Io, port: u16) !void {
    const input = try std.Io.Dir.cwd().openFile(io, "./hosts.txt", .{ .mode = .read_only });
    defer input.close(io);

    var buffer: [512]u8 = undefined;
    var reader = input.reader(io, &buffer);
    const file = &reader.interface;

    var group: std.Io.Group = .init;

    while (try file.takeDelimiter('\n')) |line| {
        group.async(io, scanHost, .{ io, line, port });
    }

    try group.await(io);
}

but it errors:

/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1064:24: error: expected type 'error{Canceled}!void', found '@typeInfo(@typeInfo(@TypeOf(scanner.scanHost)).@"fn".return_type.?).error_union.error_set!void'
return @call(.auto, function, args_casted.*);
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1064:24: note: 'error.Overflow' not a member of destination error set
/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1064:24: note: 'error.InvalidCharacter' not a member of destination error set
/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1064:24: note: 'error.InvalidEnd' not a member of destination error set
/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1064:24: note: 'error.Incomplete' not a member of destination error set
/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1064:24: note: 'error.NonCanonical' not a member of destination error set
/home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1062:59: note: function return type declared here
fn start(context: *const anyopaque) Cancelable!void {
~~~~~~~~~~^~~~~
referenced by:
async__anon_31608: /home/mishra/source/zig/build/stage3/lib/zig/std/Io.zig:1067:84
scanAll: src/scanner.zig:119:20
6 reference(s) hidden; use '-freference-trace=8' to see all reference

(and also with defer group.cancel(io), try group.async ..., but seeing a similar error

it’s telling you that function arguments to Group.async cannot have those errors in their return type. So handle them in your function somehow.

2 Likes

@alanza is right that the compile error is due to Group not allowing your tasks to return values, including errors but allowing error.Canceled.

You also have another problem that takeDelimiter may overwrite a previous line’s data which may still be used due to potential concurrency. This won’t be a problem if your buffer is big enough, which may be the case.

I had a very similar question a few days ago since Im still learning Zig.

Async functions from a group only can return Io.cancelable.Canceled as error (if I got it right). Thus, you have to catch the errors. Thats my current solution (simplified), maybe it helps understanding:

1 Like

Thanks @alanza and @lukeflo !

@vulpesx yes the buffer is big enough for the entire file.