Should I use a linked list?

Background

  1. I want multiple threads to share an ethernet port.
  2. Each thread performs “transactions” on the ethernet port.
  3. A transaction has a unique ID.
  4. A transaction has the following steps:
    1. send data (containing transaction ID)
    2. recv data (containing transaction ID) (sometimes timeout)
  5. The sent data is always the same size as the received data.
  6. The sent data is always < 1500 bytes long (a single ethernet frame).
  7. There are no adversaries on the network.

Existing Solution

My existing solution is basically a memory pool. I have a Port struct that contains an array of 256 ethernet frames and associated status of each frame:

pub const Port = struct {
    recv_frames_status_mutex: std.Thread.Mutex = .{},
    recv_frames: [max_frames]*telegram.EtherCATFrame = undefined,
    recv_frames_status: [max_frames]FrameStatus = [_]FrameStatus{FrameStatus.available} ** max_frames,
    pub const max_frames: u9 = 256;
}

Each thread calls sendRecvFrame() which handles reserving a transaction id, continuing any existing transactions, and returning when that threads transaction is finished:

pub fn sendRecvFrame(
    self: *Port,
    send_frame: *telegram.EtherCATFrame,
    recv_frame_ptr: *telegram.EtherCATFrame,
    timeout_us: u32,
) SendRecvError!void {
    var timer = std.time.Timer.start() catch |err| switch (err) {
        error.TimerUnsupported => unreachable,
    };
    var idx: u8 = undefined;
    while (timer.read() < @as(u64, timeout_us) * std.time.ns_per_us) {
        idx = self.claimTransaction() catch |err| switch (err) {
            error.NoTransactionAvailable => continue,
        };
        break;
    } else {
        return error.TransactionContention;
    }
    defer self.releaseTransaction(idx);

    try self.sendTransaction(idx, send_frame, recv_frame_ptr);

    while (timer.read() < @as(u64, timeout_us) * 1000) {
        if (try self.continueTransaction(idx)) {
            return;
        }
    } else {
        return error.RecvTimeout;
    }
}

On receive, the transactions may be returned in any order. I use the transaction ID contained in the data to put the received data back in the right place.
error.TransactionContention can happen. It is possible to exhaust my 256 available transactions. Handling this error everywhere is annoying.

Desired Behavior

  1. Infailable insertion. I want to always be able to send a transaction. No memory allocation failure, no resource exhaustion. I want to delete error.TransactionContention.

Proposed Solution

I noticed that each thread already knows how much memory is required to complete a transaction, each thread can already provide the data to send, and the memory location to put the received data into.

I can extend this idea to allow each thread to not only provide the send and recv frame buffers, but the next and prev pointer for a linked list. So each thread provides the memory for a node in the linked list.

When each thread calls continueTransaction, the linked list is traversed by each thread (protected by a mutex). If the transaction is received, the data is put in the recv buffer.

pub const Transaction = struct {
    send_buffer: *const EthernetFrame, // contains transaction ID
    recv_buffer: *EthernetFrame,
    next: ?*Transaction,
    prev: ?*Transaction,
}

It is the responsibilty of the calling thread to call releaseTransaction to pop the node from the list, when continueTransaction indicates the transaction has completed or the calling thread determines a timeout has been exceeded.

The list is doubly-linked to ensure O(1) pop.

In this manner, my Port struct can support unlimited transactions (as long as there is a unique transaction ID available) (which for details I wont go into, I can guarantee). My port abstraction will never return errors, greatly simplifying the code in all my threads.

Do you think this is a good idea? And I missing anything?
Since transactions ID’s are unique, am I missing some sort of hash-map-like optimization? Is there some sort of linked-list / hashmap data structure I should use instead?

1 Like

So the ideia is to create a queue for the transactions requests?
In this case, the queue could be any data structure, even an ArrayList. But yes the linked list would be the best for removing elements in the middle.
Just note that you’re not solving contention, you’re just pushing it somewhere else.

This is a spin loop to acquire the transaction. When you switch to the queue, you’re going to use a mutex. If you acquire the mutex in a blocking manner, you remove the error, but you’re not solving contention, you’re just sleeping through it. If you try acquiring the mutex in a non-blocking manner, you’re back where you started.
Whether the queue-less spinloop is better or worse than a queue that blocks, I can’t tell, it depends on how much work claimTransaction is doing, and how much contention there is.

This a spin loop to receive a response. It seems that the transaction begins and ends inside this function. In that case, can’t you just reserve a transaction for each thread?
Another option would be reserve a range of transaction IDs for each thread.

Contention is perhaps the wrong word to use.

In the existing implementation, I have a memory pool of transactions. If they are all used when a thread is trying to claim one, I returned error.TransactionContention. Which is analogous of error.OutOfMemory.

In the new implementation, each thread provides the memory necessary to perform the transaction on the call to sendTransaction. The calling thread need only maintain the lifetime of the provided node to complete the transaction, or call releaseTransaction if a timeout expired. The calling thread will call continueTransaction over and over (in a spin loop). There will certainly be a lot of lock contention (the mutex protecting the linked list), but thats fine.

I could certainly use a queue (ArrayList), but then I have to handle error.OutOfMemory in all my threads, when I could avoid that by using stack allocated memory in each thread and provide that to the port.

I think your approach makes sense, you could probably reduce lock contention and pointer chasing by splitting your linked list into many smaller ones and always appending to/searching in lists[hash(transaction_id) % list_count] akin to a sharded hash map. You could then also protect every one of those lists with a separate mutex.

1 Like

I feel the need to toot my own horn here:

const zelda = @import("zelda");

pub const Transaction = struct {
    send_buffer: *const EthernetFrame, // contains transaction ID
    recv_buffer: *EthernetFrame,
    next: ?*Transaction,
    prev: ?*Transaction,

    pub usingnamespace zelda.doublyLinkedList(Transaction, .next, .prev);
}

Now your linked list bugs are my linked list bugs instead!

Yeah yeah, I know. usingnamespace. You can of course just obtain the namespace and manually copy over the parts you intend to use. Manual labor is always possible in the absence of automation.

4 Likes

Here is the full implementation I came up with. It probably has some unnecessary mutex contention and a few bugs, but it works for now.

//! The port is a thread-safe interface for interacting with an ethernet port.
//! The port performs transactions (ethercat datagrams).
//! A transaction is a single ethercat datagram that travels in the ethercat ring.
//! It is sent, travels through the network (potentially modified by the subdevices), and is received.
//!
//! Callers are expected to follow the life cycle of a transaction:
//!
//! 1. `sendTransaction()`: immediately send a transaction, using a full ethernet frame. Callers may use sendTransactions() to allow multiple transactions to be packed into individual ethernet frames (recommended).
//! 2. `continueTransaction()`: callers must call this repeatedly until the transaction is received.
//! 3. `releaseTransaction()`: callers must call this if sendTransaction() is successful.
//!
//! This interface is necessary because frames (and thus transactions) may be returned out of order
//! by the ethernet interface. So the port must have access to the full list of transactions that are currently
//! pending.
//!
//! The `continueTransaction()` concept also allows single-threaded operation without an event loop.

const std = @import("std");
const assert = std.debug.assert;

const logger = @import("root.zig").logger;
const nic = @import("nic.zig");
const telegram = @import("telegram.zig");
const wire = @import("wire.zig");

const Port = @This();

link_layer: nic.LinkLayer,
settings: Settings,
transactions: Transactions,
transactions_mutex: std.Thread.Mutex = .{},

pub const Settings = struct {
    source_mac_address: u48 = 0xffff_ffff_ffff,
    dest_mac_address: u48 = 0xABCD_EF12_3456,
};

pub const Transactions = std.DoublyLinkedList(TransactionDatagram);
pub const Transaction = Transactions.Node;

pub const TransactionDatagram = struct {
    send_datagram: telegram.Datagram,
    recv_datagram: telegram.Datagram,
    done: bool = false,
    check_wkc: ?u16 = null,

    /// The datagram to send is send_datagram.
    /// If recv_region is provided, the returned datagram.data payload will be placed there.
    /// If recv_region is null, the returned datagram payload will be placed back in the send_datagram.data.
    /// The recv_region, when provided, must be same length as send_datagram.data.
    ///
    /// If check_wkc is non-null, the returned wkc will be checked to be equal before copying the data to
    /// the recv region.
    pub fn init(send_datagram: telegram.Datagram, recv_region: ?[]u8, check_wkc: ?u16) TransactionDatagram {
        if (recv_region) |region| {
            assert(send_datagram.data.len == region.len);
        }
        return TransactionDatagram{
            .send_datagram = send_datagram,
            .recv_datagram = telegram.Datagram{
                .header = send_datagram.header,
                .wkc = 0,
                .data = recv_region orelse send_datagram.data,
            },
            .check_wkc = check_wkc,
        };
    }
};

pub fn init(link_layer: nic.LinkLayer, settings: Settings) Port {
    return Port{
        .link_layer = link_layer,
        .settings = settings,
        .transactions = .{},
    };
}

pub fn deinit(self: *Port) void {
    assert(self.transactions.len == 0); // leaked transaction;
}

/// Caller owns responsibilty to release transaction after successful return from this function.
pub fn sendTransactions(self: *Port, transactions: []Transaction) error{LinkError}!void {
    // TODO: optimize to pack frames
    var n_sent: usize = 0;
    errdefer self.releaseTransactions(transactions[0..n_sent]);
    for (transactions) |*transaction| {
        try self.sendTransaction(transaction);
        n_sent += 1;
    }
}

/// Send a transaction with the ethercat bus.
/// Caller owns responsibilty to release transaction after successful return from this function.
/// Callers must take care to provide uniquely identifiable frames, through idx or other means.
/// See fn compareDatagramIdentity.
pub fn sendTransaction(self: *Port, transaction: *Transaction) error{LinkError}!void {
    assert(transaction.data.done == false); // forget to release transaction?
    assert(transaction.data.send_datagram.data.len == transaction.data.recv_datagram.data.len);
    // one datagram will always fit
    const ethercat_frame = telegram.EtherCATFrame.init(&.{transaction.data.send_datagram}) catch unreachable;
    var frame = telegram.EthernetFrame.init(
        .{
            .dest_mac = self.settings.dest_mac_address,
            .src_mac = self.settings.source_mac_address,
            .ether_type = .ETHERCAT,
        },
        ethercat_frame,
    );
    var out_buf: [telegram.max_frame_length]u8 = undefined;

    // one datagram will always fit
    const n_bytes = frame.serialize(null, &out_buf) catch |err| switch (err) {
        error.NoSpaceLeft => unreachable,
    };
    const out = out_buf[0..n_bytes];

    // We need to append the transaction before we send.
    // Because we may recv from any thread.
    {
        self.transactions_mutex.lock();
        defer self.transactions_mutex.unlock();
        self.transactions.append(transaction);
    }
    errdefer self.releaseTransaction(transaction);
    // TODO: handle partial send error
    _ = self.link_layer.send(out) catch return error.LinkError;
}

/// fetch a frame by receiving bytes
///
/// returns immediatly
/// returns true when transaction is received (done)
pub fn continueTransaction(self: *Port, transaction: *Transaction) error{LinkError}!bool {
    if (self.isDone(transaction)) return true;
    self.recvFrame() catch |err| switch (err) {
        error.LinkError => {
            return error.LinkError;
        },
        error.InvalidFrame => {},
    };
    return self.isDone(transaction);
}

fn isDone(self: *Port, transaction: *Transaction) bool {
    self.transactions_mutex.lock();
    defer self.transactions_mutex.unlock();
    return transaction.data.done;
}

fn recvFrame(self: *Port) !void {
    var buf: [telegram.max_frame_length]u8 = undefined;
    var frame_size: usize = 0;

    frame_size = self.link_layer.recv(&buf) catch |err| switch (err) {
        error.WouldBlock => return,
        else => {
            logger.err("Socket error: {}", .{err});
            return error.LinkError;
        },
    };
    if (frame_size == 0) return;
    if (frame_size > telegram.max_frame_length) return error.InvalidFrame;

    assert(frame_size <= telegram.max_frame_length);
    const bytes_recv: []const u8 = buf[0..frame_size];

    var frame = telegram.EthernetFrame.deserialize(bytes_recv) catch |err| {
        logger.info("Failed to deserialize frame: {}", .{err});
        return;
    };
    for (frame.ethercat_frame.datagrams().slice()) |datagram| {
        self.findPutDatagramLocked(datagram);
    }
}

fn findPutDatagramLocked(self: *Port, datagram: telegram.Datagram) void {
    self.transactions_mutex.lock();
    defer self.transactions_mutex.unlock();

    var current: ?*Transaction = self.transactions.first;
    while (current) |node| : (current = node.next) {
        if (node.data.done) continue;
        if (compareDatagramIdentity(datagram, node.data.send_datagram)) {
            defer node.data.done = true;
            node.data.recv_datagram.header = datagram.header;
            node.data.recv_datagram.wkc = datagram.wkc;
            // memcpy can be skipped for non-read commands
            switch (datagram.header.command) {
                .APRD,
                .APRW,
                .ARMW,
                .BRD,
                .BRW,
                .FPRD,
                .FPRW,
                .FRMW,
                .LRD,
                .LRW,
                => {
                    if (node.data.check_wkc == null or node.data.check_wkc.? == datagram.wkc) {
                        @memcpy(node.data.recv_datagram.data, datagram.data);
                    }
                },
                .APWR, .BWR, .FPWR, .LWR, .NOP => {},
                _ => {},
            }
        }
        // we intentionally do not break here since we want to
        // handle idx collisions gracefully by just writing to all of them
    }
}

/// returns true when the datagrams are the same
fn compareDatagramIdentity(first: telegram.Datagram, second: telegram.Datagram) bool {
    if (first.header.command != second.header.command) return false;
    if (first.header.idx != second.header.idx) return false;
    switch (first.header.command) {
        .APRD,
        .APRW,
        .APWR,
        .ARMW,
        .BRD,
        .BRW,
        .BWR,
        => if (first.header.address.position.offset != second.header.address.position.offset) return false,
        .FPRD,
        .FPRW,
        .FPWR,
        .FRMW,
        .LRD,
        .LRW,
        .LWR,
        .NOP,
        => if (first.header.address.logical != second.header.address.logical) return false,
        _ => return false,
    }
    if (first.header.length != second.data.len) return false;
    if (first.data.len != second.data.len) return false;
    return true;
}

pub fn releaseTransactions(self: *Port, transactions: []Transaction) void {
    self.transactions_mutex.lock();
    defer self.transactions_mutex.unlock();
    for (transactions) |*transaction| {
        self.transactions.remove(transaction);
        transaction.data.done = false; // TODO: reevaluate if this makes sense
    }
}

pub fn releaseTransaction(self: *Port, transaction: *Transaction) void {
    self.transactions_mutex.lock();
    defer self.transactions_mutex.unlock();
    self.transactions.remove(transaction);
    transaction.data.done = false; // TODO: reevaluate if this makes sense
}