Hi,
I’m a little bit stuck with getting my code to work the way I want it.
Since I’m really new to Zig and documentation (especially on master branch) code is sometimes missing and often outdated, I’m hoping for some help from the experts here ![]()
What I try to accomplish is the following:
I want to download (GET) file content in chunks and afterwards put it back together as complete file. Additionally, I’m trying to define a max count of parallel downloads. Always when a single chunk download is finished, the next chunk should start downloading until there are no chunks left and the whole body is downloaded an put together to a new local file. I know thats a very common task, but its only the frame for a more specialized workflow in the final code. But for now I’m stuck with the basic task itself.
For a different program in Rust I accomplished that by using a Semaphore together with a Vec of Tokio tasks. Every task there get a permit as long as the max permit count of the semaphore isn’t reached. If there is no free permit, it idles until an earlier task is finished. That works fine, but the whole tokio/Semaphore code interface of Rust is very abstract and there is a lot of stuff happening in the background I don’t see, let alone understand if I look into the code.
Thus, I thought using a std.Io.Semaphore together with some async/concurrent functions could make it work in Zig too. I have the following code which is a simplified version of the stuff I want to accomplish. Its not really downloading anything but slicing a string into parts and returning each part after a sleep of 2 seconds. This should mimic a potential downloading time for bigger chunks good enough. The code runs fine and the file is put together correctly. But all tasks are executed in parallel meaning the whole code only takes 2 seconds. But what I want with this example should be about 12 seconds, because the max permit count of 3 of the semaphore.
const std = @import("std");
const Io = std.Io;
pub fn main(init: std.process.Init) !void {
const allocator: std.mem.Allocator = init.gpa;
var threaded: Io.Threaded = .init(allocator, .{ .environ = init.minimal.environ });
defer threaded.deinit();
const io = threaded.io();
// Some dumb blind text
const content =
\\Far far away, behind the word mountains, far from the countries Vokalia and
\\Consonantia, there live the blind texts. Separated they live in Bookmarksgrove
\\right at the coast of the Semantics, a large language ocean. A small river named
\\Duden flows by their place and supplies it with the necessary regelialia. It is
\\a paradisematic country, in which roasted parts of sentences fly into your
\\mouth. Even the all-powerful Pointing has no control about the blind texts it is
;
const ContentRange = struct {
start: usize,
end: usize,
};
const part_size = 50;
// Calculate parts count
var part_count = (content.len / part_size) + 1;
var last_part_size = content.len % part_size;
if (last_part_size == 0) {
last_part_size = part_size;
part_count -= 1;
}
// Create array of part ranges
var ranges = std.ArrayList(ContentRange).empty;
defer ranges.clearAndFree(allocator);
// Add ranges to array
for (0..part_count) |part| {
const start = part * part_size;
// INFO: Remember the "end" index of a http request rangesis inclusive,
// while "end" index of byte/string slice is exclusive
const end = if (part == part_count - 1)
start + last_part_size - 1
else
(part + 1) * part_size - 1;
try ranges.append(allocator, .{ .start = start, .end = end });
}
for (ranges.items) |r| {
std.log.info("Range {d}-{d}: {s}", .{ r.start, r.end, content[r.start .. r.end + 1] }); // add one to end index since byte slices are exclusive
}
const start_time = std.Io.Timestamp.now(io, .awake);
var local_file = try std.Io.Dir.createFile(.cwd(), io, "local_file", .{});
const max_concurrency_count = 3;
var semaphore: Io.Semaphore = .{ .permits = max_concurrency_count };
var futures: std.ArrayList(Io.Future(anyerror!void)) = .empty;
defer futures.clearAndFree(allocator);
// TODO: Maybe a different implementation with Io.Group, Io.Queue or Io.Select
// var futures: std.Io.Queue(Io.Future(!void)) = .init(&.{});
for (ranges.items, 0..) |r, i| {
std.log.debug("Start putting range {d} to futures", .{i});
const chunk = content[r.start .. r.end + 1];
try futures.append(allocator, io.async(downloadAndWrite, .{ io, &semaphore, &local_file, chunk, r.start }));
semaphore.post(io);
std.log.debug("Finished putting range {d} to futures, semaphore increased", .{i});
// try futures.putOne(io, io.async(downloadAndWrite, .{ io, &semaphore, &local_file, chunk, r.start }));
}
for (futures.items) |*func| {
try func.await(io);
const now = Io.Timestamp.untilNow(start_time, io, .awake);
std.log.debug("Completed part {d}ms after start", .{now.toMilliseconds()});
}
var buf: [1024]u8 = undefined;
var stdout = std.Io.File.Writer.init(.stdout(), io, &buf);
const stdout_writer = &stdout.interface;
try stdout_writer.print("Finished\n", .{});
}
// Fake a download, waiting 0.5s before returning the chunk.
// Then writing the chunk to the newly created file at the correct position.
fn downloadAndWrite(io: Io, sema: *Io.Semaphore, file: *std.Io.File, chunk: []const u8, start: u64) anyerror!void {
try sema.wait(io);
try io.sleep(std.Io.Duration.fromMilliseconds(2000), .awake);
try file.writePositionalAll(io, chunk, start);
}
Output (where I would await more 2 seconds steps):
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2001ms after start
debug: Completed part 2001ms after start
debug: Completed part 2001ms after start
debug: Completed part 2001ms after start
I already thought using either std.Io.Group, std.Io.Select or std.Io.Queue for this job might be better, but documentation on those structs is rare. Most stuff online uses something like std.Thread.Pool which seems deprecated in master.
Thus, I’m thankful for any idea/hint, whatever! And sorry for the long text and if I’m missing something obvious. Zig is still new to me, as is this low level async/concurrent stuff (I know “async is not concurrent”
)
Thank you