Embassy Investigation
Asynchronous
Future Trait will be given with a poll to check pending or ready. And the ability to prod Waker to poll again in the future.
Runtime/Exectutor: The real implementation to dispatch/poll Future.
async/await is the simplification workflow of callback. Which await will pause and return control to the executor until the production of output. But the whole workflow requires no need for manual callback handling.
Whole process doesn’t require multiple independent stack which will grows in heavy concurrency.
However, it suitable only for I/O intensive rather CPU intensive operations, because there’s no need to “wait” in the latter situation. Even the runtime dispatch each task, the total time won’t be reduced by jumping off block/wait period because you still need to finish all.
Embedded
embassy is a light runtime for embedded system.
Expansion of #[embassy_executor::task]
#[doc(hidden)]
async fn __run1_task() {
loop {
log::info!("tick for 1 sec");
embassy_time::Timer::after_secs(1).await;
}
}
fn run1() -> ::embassy_executor::SpawnToken<impl Sized> {
type Fut = impl ::core::future::Future + 'static;
const POOL_SIZE: usize = 1;
static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> =
::embassy_executor::raw::TaskPool::new();
unsafe { POOL._spawn_async_fn(move || __run1_task()) }
}
TaskPool is a fixed size array with same type of the Fut, with length 1. So if you spawn the same function, it will be contaminated, resulting error.
Another design is TaskStorage which is the raw Executor should be alive throughout the program life time.
const/staticBox::leak()transmute()with a static lifetime notation
Struct:
TaskStorage/TaskPool ~ [TaskStorage;N]spawn task.AvailableTaskreuseTaskStoragebyclaimto recycle andinitializeto spawn new function.
Attempt of U-Level Embassy
- implement simple
CondVar(axsyncmodule).- implement
FUTEX_WAIT_QUEUES:
pub type Futex = AtomicU32 fn futex_wake(futex:&Futex); fn futex_wake_all(futex:&Futex); fn futex_wait(futex: &Futex, expected: u32, timeout: Option<Duration>) -> bool;- implement S-level code:
fn wait_optional_timeout<'a, T>( &self, guard: MutexGuard<'a, T>, timeout: Option<Duration>, ) -> Option<MutexGuard<'a, T>> { let expected = self.futex.load(core::sync::atomic::Ordering::Relaxed); let mutex = guard.mutex(); let suc = futex_wait(&self.futex, expected, timeout); let new_guard = mutex.lock(); if !suc && timeout.is_some() { None } else { Some(new_guard) } }- implement U-level, roughly same.
- implement
- implement
embassy_executor::Executorwith style same likefeatures = std. Which useCondVarasSignaler.
There’s no timer logic here, so we can’t wait.
Timer Queue
[dependencies]
embassy-time-driver = { version = "...", features = ["tick-hz-1000"] } # Driver fixes embassy TICK_HZ
embassy-time-queue-utils = { version = "..." }
schedule_wake(at: u64, waker: &Waker): Adds a new timer event to the queue. It merges wakers if a timer for the same task already exists (to avoid multiple queue entries for one task). It returnstrueif the newly added event becomes the earliest event in the queue, indicating operations by retrieving hardware timer’s time.poll(current_time: u64): This method is called by the driver’s interrupt handler. It iterates through the queue and identifies all timer events whose target time (at) is less than or equal tocurrent_time. For each expired event, it callswaker.wake_by_ref(). It removes expired events from the queue.next_expiration(current_time: u64) -> u64: Returns the target time of the earliest event still remaining in the queue whose time is strictly greater thancurrent_time. Returnsu64::MAXor a similar indicator if the queue is empty or all events are in the past/current.
Workflow:
- An
asynctask in your application calls anembassy-timefunction likeDelay::delay_ms(1000).await - The
embassy-timecalculates the absolute target wake-up time in embassy ticks (current_embassy_ticks + duration_in_embassy_ticks). embassy-timecalls your driver’sDriver::schedule_wake(target_time, task_waker).- Your schedule_wake implementation (within a critical section) calls
queue.schedule_wake(target_time, task_waker). - If
queue.schedule_wake(...)returnstrue(because the new timer is the earliest), yourschedule_wakegets the new earliest time from the queue using queue.next_expiration(self.now()). It then calls hardware timer’s API (custom:set_alarm_at) to set the next interrupt to fire at this time (converting the time from embassy ticks to hardware timer units usingaxconfig::devices::TIMER_FREQUENCY). - Eventually, the hardware timer reaches the programmed value and triggers an interrupt,the handler gets the current time (
Driver::now()). - The handler calls
queue.next_expiration(current_time). This iterates the queue, finds expired timers, and callswaker.wake_by_ref()for each, marking the associated tasks as ready. - The
embassy-executor, having been woken by the interrupt, polls the tasks that were just woken up. - The task that was waiting on the timer is polled, sees its waker was woken, and resumes execution past the
.awaitpoint.
Embassy Executor
- embassy executor in os, should park to yield cpu until be waken due to specific event.
- axtask should wake specific future in specific task. Then executor will rerun to poll.
Executor Workflow:
- keeps a queue of tasks
- poll task sequentially until it blocks and yielding
- enqueue the block task and proceed to poll next
- repeat…
Interrupts Workflow:
- polled task will trigger peripherals to make a interruption
- interruption will be received by handler
- handler will update peripherals state
- handler will notify executor to poll the task.
Embassy Preempt
Thread: Owns a private stack Coroutine: Owns a shared stack
We don’t know which is real sync or which is async, we only know the poll reason and wether it has stack or not.
We follow the rule of the reason:
- await poll for no stack no matter task has or not. After await poll, it has no stack no matter previous.
- interrupt poll for stack no matter task has or not, if not, we make one. After interrupt poll, it has private stack no matter previous.
- Common
- Find highest prio task
- Set alarm call back
- await poll(cooperative)
- common
- poll the highest priority
- if the stack is in Thread mode or has its stack, drop and back to coroutine.
- run task until
pending - update task state to
NotReady - Seek next and repeat Stack will be reused after poll due to yielding actively.
- interupt poll(preemptive)
- common
- Here for stackless task, it will allocate a stack to mimic just as stack task.
Comment: We construct a “trampoline”, a temporary stack to unify the workflow between stack and stackless task. Which unify and simplify workflow for
PendSV. IntCtxSW/PendSV:- await mode: stack/mimic-task task to take over stackless(shared stack) task yielding by
.await. - interrupt mode: stackless(shared stack) is interrupted to be taken over by stack/mimic-stack task.
- switch to new task stack, update PSP ptr to new task.
- back to normal and execute.
- await mode: stack/mimic-task task to take over stackless(shared stack) task yielding by
- trigger PendSV Stack will be transformed to its own private stack.
Life Time:
- Sync:
- Encapsulated in future as pesudo async.
- Behavoir like a coroutine before interuption.
- After interupted, change the shared stack to its own private stack as Thread.
- No
.awaitmeans it will continue as Thread as handling in interuption workflow.
- Async:
- Same as Sync.
- Behavoir as coroutine in
.awaitbefore interuption. - After interupted, change the shared stack to its own private stack as Thread.
.awaitmeans its private stack will be dropped and behavoir as coroutine.
Embassy Logic:
Initialize a
AvaialableTaskcontains:TaskStoragewhich containspoll_fnandfuture:fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { unsafe { self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); self.task.future.write_in_place(future); let task = TaskRef::new(self.task); SpawnToken::new(task) } }Spawn token is nothing but a ptr to the raw task.
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { let task = AvailableTask::claim(self); match task { Some(task) => task.initialize(future), None => SpawnToken::new_failed(), } }Executor is:
#[repr(transparent)] pub struct Executor { pub(crate) inner: SyncExecutor, _not_sync: PhantomData<*mut ()>, }Where inner will spawn, which enqueue by head to its
RunQueueeveryTaskRef.__penderwill be the context of executor to notify event.poll every task by execute
poll_fnin everyTaskRefin dequeue process.