Record of Learning of Uring
What is uring?
[Week 1]
SQ: Submission queue based on circular queue
CQ: Completion queue based on circular queue
SQE/CQE: the entry of SQ/CQ
In linux design, SQ only store the serial number of SQEs, CQ store the whole data.
Submission:
- Put SQE in SQEs, record cardinal, update SQ tail.
- Repeat for multiple task.
Completion:
- Kernel complete task, update CQ head.
- User receive task, update CQ tail.
What is a lock free ring buffer?
[Week 2]
Reserve commit to retain space before really modifying.
[Week 2]
struct Slot<T> {
stamp: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
A slot to initiate data and a stamp for atomic operation.
/// one_lap = mark_bit * 2 = 010...0 where one_lap + idx = real_idx
one_lap: usize,
/// size: 001...0 where size - 1 = 00011... as modulo.
mark_bit: usize,
Thus we use one_lap - 1 = 001111
with idx & !(one_lap - 1)
to acquire the additional lap of wrapping around.
Suppose a push operation. We first calculate the index and lap which is a wrapping around modulo of index.
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);
let new_tail = if index + 1 < self.buffer.len() {
// index + 1 < len means index is in the same lap.
tail + 1
} else {
lap.wrapping_add(self.one_lap)
};
Then we acquire the data and check whether the stamp is equal to tail index, if so, it means
the local variable tail when the preparation of push and current variable stamp
is equal, we
try to exchange the tail to new tail and update the slot of tail.
if tail == stamp {
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Write the value into the slot and update the stamp.
slot.value.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
Err(t) => {
tail = t;
}
}
...
}
Else, we are in full state that the stamp
is ahead of tail
, in order to
reflect the critical condition in error handling on Slot<T>
,
we should take full_fence()
then call back the handling.
else if stamp.wrapping_add(self.one_lap) == tail + 1 {
crate::full_fence();
value = fail(value, tail, new_tail, slot)?;
#[cfg(loom)]
busy_wait();
tail = self.tail.load(Ordering::Relaxed);
} else {
busy_wait();
tail = self.tail.load(Ordering::Relaxed);
}
Where we need the latest slot
here.
Design IPC
[Week 1]
IPC (Refactored by AI to reduce redundant design.)
Combine the convenience of uring to establish asynchronous communication.
User:
- call asynchronous kernel function, e.g.
async_read(...)
. - function will create a
Future
specific for uring calledUringFuture
. spawn()
will push the future wrapped asTask
intoExecutor
.- When
Executor
first poll the task, it add it to SQ.
Kernel:
- initiate SQ,CQ,
Executor
. - read and implement
SQE
. - write
CQE
packed task.
Executor:
- poll task
- submit the SQE of some tasks in batch by
io_uring_submit
. - read CQE of its
TaskRef
to wake, thus proceed future.
Based on original logic, user calling any kernel asynchronous task will put on the SQE.
Kernel has Executor
and Reactor
Tokio-Uring
[Week 1]
Tokio uring initialize io-uring SQ,CQ and epoll based runtime. Using asynchronous task epoll_wait
to notify and draining all completion tasks.
When the kernel pushes a completion event onto the completion queue, “epoll_wait” unblocks and returns a readiness event. The Tokio current-thread runtime then polls the io-uring driver task, draining the completion queue and notifying completion futures.
Tokio uring use thread-per-core design in which completion future will not implement Send
, thus the task awaiting these operations are also not Send
.
Thus user need to take care of load balance, a suggestion as follows:
let listener = Arc::new(TcpListener::bind(...));
spawn_on_each_thread(async {
loop {
// Wait for this worker to have capacity. This
// ensures there are a minimum number of workers
// in the runtime that are flagged as with capacity
// to avoid total starvation.
current_worker::wait_for_capacity().await;
let socket = listener.accept().await;
spawn(async move { ... });
}
})
Tokio uring use ownership to take over resource control to avoid cancellation problem.
/// The caller allocates a buffer
let buf = buf::IoBuf::with_capacity(4096);
// Read the first 1024 bytes of the file, when `std::ops::Try` is
// stable, the `?` can be applied directly on the `BufResult` type.
let BufResult(res, buf) = file.read_at(0, buf.slice(0..1024)).await;
the operation futures may not own data referenced by the in-flight operation. The tokio-uring runtime will take ownership and store resources referenced by operations while they are in-flight.
Thus runtime will take over the resource by:
struct IoUringDriver {
// Storage for state referenced by in-flight operations
in_flight_operations: Slab<Operation>,
// The io-uring submission and completion queues.
queues: IoUringQueues,
}
struct Operation {
// Resources referenced by the kernel, this must stay
// available until the operation completes.
state: State,
lifecycle: Lifecycle,
}
enum Lifecycle {
/// The operation has been submitted to uring and is currently in-flight
Submitted,
/// The submitter is waiting for the completion of the operation
Waiting(Waker),
/// The submitter no longer has interest in the operation result.
Ignored,
/// The operation has completed. The completion result is stored.
Completed(Completion),
}
Operation
struct keeps the data referenced by the operation submitted to kernel.- Runtime will allocate the
Operation
to store data which submitted operation needs asSubmitted
state. - If runtime receive the completed results, it store the results and transitions to
Completed
state. - If future drops before the operation complete, drop function will remove the request otherwise it set the state to
Ignored
and submit a cancellation request to Kernel.
Evering
[Week 2]
Uring
A lock-free deque by atomic operation.
pub trait UringSpec {
type A;
type B;
type Ext = ();
fn layout(size_a: usize, size_b: usize) -> Result<(Layout, usize, usize), LayoutError> {
let layout_header = Layout::new::<Header<Self::Ext>>();
let layout_a = Layout::array::<Self::A>(size_a)?;
let layout_b = Layout::array::<Self::B>(size_b)?;
let (comb_ha, off_a) = layout_header.extend(layout_a)?;
let (comb_hab, off_b) = comb_ha.extend(layout_b)?;
let comb_hab = comb_hab.pad_to_align();
Ok((comb_hab, off_a, off_b))
}
}
A uring is based on a continguous memory layout: header|buf_A|buf_B
, we expose the api for finer manipulation by user. In order for a tight layout, buf_A
and buf_B
should has array size as power of 2. In order to restrict the input size, we use the Pow2
with const fn new(pow:usize) -> Pow2
as a compile time check.
A RawUring
is a ptr copy to the memory layout. For example, UringA
or UringB
will initiate the buffer as a Queue
structure to manipulate it, e.g.
impl<S: UringSpec> UringReceiver for UringA<S> {
type T = S::B;
fn receiver(&mut self) -> Queue<'_, Self::T> {
self.queue_b()
}
}
Reasonably, we can copy many RawUring
for specific UringA
as multiple producer and UringB
as consumer.
Queue
is a structure with a known Range
and ptr to the buffer. Range
contains head
and tail
by atomic operation to achieve lock-free. Thus implement enqueue
, dequeue
etc…
Driver
In order to understand driver, we should know why we need it.
pub struct Runtime<P, U: Uring> {
pub executor: Executor,
pub uring: RefCell<U>,
pub driver: Driver<P, U::Ext>,
pub pending_submissions: RefCell<VecDeque<LocalWaker>>,
}
A runtime contains uring
, executor/reactor
, driver
. We know executor/reactor
can spawn asynchronous task or Future
and wake executor by Waker
. A uring can handle the connection between client and server(Or any terminology based on such communication model.) by submitting the task and receiving the completion
. In order to construct a connection between Future
and uring that wakes certain submitted task in polling, we need a Driver
as a intermediation to tackle.
Driver
need to submit certain operation to record the uring
send data and therefore in future logic, can poll
based on the state of submitted operation by the extracted waker
to achieve asynchronous communication.
Now we format explicitly:
- A asynchronous function is called which internally calls
submit
to register aOp
onDriver
. submit
will send the related data to uring SQ.- Kernel side will do its work to push the results to
CQ
Driver
has a repetition loop to retrieve data from CQ and transform if needed.- The asynchronous function has a future logic call
Driver
to retrieve data if could or register the waker if initiated. - Then we await the answer!
We abstract below:
Submitter = Bridge<Submit>
, Receiver = Bridge<Receive>
where Bridge { driver: Driver }
.
Each must own a reference to Driver
to retrieve or receive data. Driver
is a cache can be quickly read/written lock-free(However, there seems no such data structure in no-std environment, so we choose a lock_api
with a Driver<L>
where L = lock_api::Mutex
).
Driver<T> {
cache: Slab<Opsign<T>>,
}
enum OpState<T> {
Waiting(Waker),
Completion(T),
}
Opsign<T> {
state: OpState<T>,
}
A problem is the abnormal workflow when future drops but the data transmission still working. If future drops, whether or not the data is received successfully, we need to discard it.
OpSign<T> {
state: OpState<T>,
cancelled: AtomicBool,
}
We use a AtomicBool
to indicate the discard signal when future drop(fut)
. Thus when we will check the state and drop the data received.
Unlucky, the repo of original doesn’t suitably design the structure and everything messy here.
Shared Memory
[Week 5]
pub struct ShmBox<T: ?Sized>(NonNull<T>);
ShmBox
is a Box<T>
-like structure with life time handled by shared memory related allocator.
Given a allocator, allocate a continuous memory range:
/// [`ShmHeader`] contains necessary metadata of a shared memory region.
///
/// Memory layout of the entire shared memory is illustrated as below,
///
/// ```svgbob
/// .-----------------------------------------------------------------------------.
/// | | | | |
/// | [1] uring offsets | [2] allocator | [3] uring buffers | [4] free memory ... |
/// | ^ | | | ^ |
/// '-|-------------------------------------------------------------------------|-'
/// '-- start of the shared memory (page aligned) |
/// end of the shared memory --'
/// ```
The design is raw, given a finite range, the allocator must ensure that it allocates everything inside the range for file mapping. Then we should handle everything with NonNull<T>
explicitly. We also need a static life time file descriptor to maintain a consistency.
Thus a independent allocator with concrete memory layout could be used as a isolated shared memory constructor as ShmBox<T,A>
. A box can be coupled with inter-process atomic Mutex
for reading/writing, or better, a signal design if possible with uring.
A general design should use a A:Allocator
, however, rust tends to use global allocator and the related api is experimental. Rather, if we want to develop a independent part of memory management, we should use NonNull<T>
as mentioned before, it’s unbearable considering the maintaining period.
As for the layout, it would cast to the allocator itself with its best on memory continuity, thus, allocator + uring + free memory
as a unified instance solely with stable exposed api.