Here is the full implementation I came up with. It probably has some unnecessary mutex contention and a few bugs, but it works for now.
//! The port is a thread-safe interface for interacting with an ethernet port.
//! The port performs transactions (ethercat datagrams).
//! A transaction is a single ethercat datagram that travels in the ethercat ring.
//! It is sent, travels through the network (potentially modified by the subdevices), and is received.
//!
//! Callers are expected to follow the life cycle of a transaction:
//!
//! 1. `sendTransaction()`: immediately send a transaction, using a full ethernet frame. Callers may use sendTransactions() to allow multiple transactions to be packed into individual ethernet frames (recommended).
//! 2. `continueTransaction()`: callers must call this repeatedly until the transaction is received.
//! 3. `releaseTransaction()`: callers must call this if sendTransaction() is successful.
//!
//! This interface is necessary because frames (and thus transactions) may be returned out of order
//! by the ethernet interface. So the port must have access to the full list of transactions that are currently
//! pending.
//!
//! The `continueTransaction()` concept also allows single-threaded operation without an event loop.
const std = @import("std");
const assert = std.debug.assert;
const logger = @import("root.zig").logger;
const nic = @import("nic.zig");
const telegram = @import("telegram.zig");
const wire = @import("wire.zig");
const Port = @This();
link_layer: nic.LinkLayer,
settings: Settings,
transactions: Transactions,
transactions_mutex: std.Thread.Mutex = .{},
pub const Settings = struct {
source_mac_address: u48 = 0xffff_ffff_ffff,
dest_mac_address: u48 = 0xABCD_EF12_3456,
};
pub const Transactions = std.DoublyLinkedList(TransactionDatagram);
pub const Transaction = Transactions.Node;
pub const TransactionDatagram = struct {
send_datagram: telegram.Datagram,
recv_datagram: telegram.Datagram,
done: bool = false,
check_wkc: ?u16 = null,
/// The datagram to send is send_datagram.
/// If recv_region is provided, the returned datagram.data payload will be placed there.
/// If recv_region is null, the returned datagram payload will be placed back in the send_datagram.data.
/// The recv_region, when provided, must be same length as send_datagram.data.
///
/// If check_wkc is non-null, the returned wkc will be checked to be equal before copying the data to
/// the recv region.
pub fn init(send_datagram: telegram.Datagram, recv_region: ?[]u8, check_wkc: ?u16) TransactionDatagram {
if (recv_region) |region| {
assert(send_datagram.data.len == region.len);
}
return TransactionDatagram{
.send_datagram = send_datagram,
.recv_datagram = telegram.Datagram{
.header = send_datagram.header,
.wkc = 0,
.data = recv_region orelse send_datagram.data,
},
.check_wkc = check_wkc,
};
}
};
pub fn init(link_layer: nic.LinkLayer, settings: Settings) Port {
return Port{
.link_layer = link_layer,
.settings = settings,
.transactions = .{},
};
}
pub fn deinit(self: *Port) void {
assert(self.transactions.len == 0); // leaked transaction;
}
/// Caller owns responsibilty to release transaction after successful return from this function.
pub fn sendTransactions(self: *Port, transactions: []Transaction) error{LinkError}!void {
// TODO: optimize to pack frames
var n_sent: usize = 0;
errdefer self.releaseTransactions(transactions[0..n_sent]);
for (transactions) |*transaction| {
try self.sendTransaction(transaction);
n_sent += 1;
}
}
/// Send a transaction with the ethercat bus.
/// Caller owns responsibilty to release transaction after successful return from this function.
/// Callers must take care to provide uniquely identifiable frames, through idx or other means.
/// See fn compareDatagramIdentity.
pub fn sendTransaction(self: *Port, transaction: *Transaction) error{LinkError}!void {
assert(transaction.data.done == false); // forget to release transaction?
assert(transaction.data.send_datagram.data.len == transaction.data.recv_datagram.data.len);
// one datagram will always fit
const ethercat_frame = telegram.EtherCATFrame.init(&.{transaction.data.send_datagram}) catch unreachable;
var frame = telegram.EthernetFrame.init(
.{
.dest_mac = self.settings.dest_mac_address,
.src_mac = self.settings.source_mac_address,
.ether_type = .ETHERCAT,
},
ethercat_frame,
);
var out_buf: [telegram.max_frame_length]u8 = undefined;
// one datagram will always fit
const n_bytes = frame.serialize(null, &out_buf) catch |err| switch (err) {
error.NoSpaceLeft => unreachable,
};
const out = out_buf[0..n_bytes];
// We need to append the transaction before we send.
// Because we may recv from any thread.
{
self.transactions_mutex.lock();
defer self.transactions_mutex.unlock();
self.transactions.append(transaction);
}
errdefer self.releaseTransaction(transaction);
// TODO: handle partial send error
_ = self.link_layer.send(out) catch return error.LinkError;
}
/// fetch a frame by receiving bytes
///
/// returns immediatly
/// returns true when transaction is received (done)
pub fn continueTransaction(self: *Port, transaction: *Transaction) error{LinkError}!bool {
if (self.isDone(transaction)) return true;
self.recvFrame() catch |err| switch (err) {
error.LinkError => {
return error.LinkError;
},
error.InvalidFrame => {},
};
return self.isDone(transaction);
}
fn isDone(self: *Port, transaction: *Transaction) bool {
self.transactions_mutex.lock();
defer self.transactions_mutex.unlock();
return transaction.data.done;
}
fn recvFrame(self: *Port) !void {
var buf: [telegram.max_frame_length]u8 = undefined;
var frame_size: usize = 0;
frame_size = self.link_layer.recv(&buf) catch |err| switch (err) {
error.WouldBlock => return,
else => {
logger.err("Socket error: {}", .{err});
return error.LinkError;
},
};
if (frame_size == 0) return;
if (frame_size > telegram.max_frame_length) return error.InvalidFrame;
assert(frame_size <= telegram.max_frame_length);
const bytes_recv: []const u8 = buf[0..frame_size];
var frame = telegram.EthernetFrame.deserialize(bytes_recv) catch |err| {
logger.info("Failed to deserialize frame: {}", .{err});
return;
};
for (frame.ethercat_frame.datagrams().slice()) |datagram| {
self.findPutDatagramLocked(datagram);
}
}
fn findPutDatagramLocked(self: *Port, datagram: telegram.Datagram) void {
self.transactions_mutex.lock();
defer self.transactions_mutex.unlock();
var current: ?*Transaction = self.transactions.first;
while (current) |node| : (current = node.next) {
if (node.data.done) continue;
if (compareDatagramIdentity(datagram, node.data.send_datagram)) {
defer node.data.done = true;
node.data.recv_datagram.header = datagram.header;
node.data.recv_datagram.wkc = datagram.wkc;
// memcpy can be skipped for non-read commands
switch (datagram.header.command) {
.APRD,
.APRW,
.ARMW,
.BRD,
.BRW,
.FPRD,
.FPRW,
.FRMW,
.LRD,
.LRW,
=> {
if (node.data.check_wkc == null or node.data.check_wkc.? == datagram.wkc) {
@memcpy(node.data.recv_datagram.data, datagram.data);
}
},
.APWR, .BWR, .FPWR, .LWR, .NOP => {},
_ => {},
}
}
// we intentionally do not break here since we want to
// handle idx collisions gracefully by just writing to all of them
}
}
/// returns true when the datagrams are the same
fn compareDatagramIdentity(first: telegram.Datagram, second: telegram.Datagram) bool {
if (first.header.command != second.header.command) return false;
if (first.header.idx != second.header.idx) return false;
switch (first.header.command) {
.APRD,
.APRW,
.APWR,
.ARMW,
.BRD,
.BRW,
.BWR,
=> if (first.header.address.position.offset != second.header.address.position.offset) return false,
.FPRD,
.FPRW,
.FPWR,
.FRMW,
.LRD,
.LRW,
.LWR,
.NOP,
=> if (first.header.address.logical != second.header.address.logical) return false,
_ => return false,
}
if (first.header.length != second.data.len) return false;
if (first.data.len != second.data.len) return false;
return true;
}
pub fn releaseTransactions(self: *Port, transactions: []Transaction) void {
self.transactions_mutex.lock();
defer self.transactions_mutex.unlock();
for (transactions) |*transaction| {
self.transactions.remove(transaction);
transaction.data.done = false; // TODO: reevaluate if this makes sense
}
}
pub fn releaseTransaction(self: *Port, transaction: *Transaction) void {
self.transactions_mutex.lock();
defer self.transactions_mutex.unlock();
self.transactions.remove(transaction);
transaction.data.done = false; // TODO: reevaluate if this makes sense
}