Embassy Investigation
Asynchronous
Future
Trait will be given with a poll
to check pending or ready. And the ability to prod Waker
to poll again in the future.
Runtime/Exectutor
: The real implementation to dispatch/poll Future
.
async/await
is the simplification workflow of callback. Which await
will pause and return control to the executor until the production of output. But the whole workflow requires no need for manual callback handling.
Whole process doesn’t require multiple independent stack which will grows in heavy concurrency.
However, it suitable only for I/O intensive rather CPU intensive operations, because there’s no need to “wait” in the latter situation. Even the runtime dispatch each task, the total time won’t be reduced by jumping off block/wait
period because you still need to finish all.
Embedded
embassy
is a light runtime for embedded system.
Expansion of #[embassy_executor::task]
#[doc(hidden)]
async fn __run1_task() {
loop {
log::info!("tick for 1 sec");
embassy_time::Timer::after_secs(1).await;
}
}
fn run1() -> ::embassy_executor::SpawnToken<impl Sized> {
type Fut = impl ::core::future::Future + 'static;
const POOL_SIZE: usize = 1;
static POOL: ::embassy_executor::raw::TaskPool<Fut, POOL_SIZE> =
::embassy_executor::raw::TaskPool::new();
unsafe { POOL._spawn_async_fn(move || __run1_task()) }
}
TaskPool
is a fixed size array with same type of the Fut
, with length 1. So if you spawn the same function, it will be contaminated, resulting error.
Another design is TaskStorage
which is the raw Executor
should be alive throughout the program life time.
const/static
Box::leak()
transmute()
with a static lifetime notation
Struct:
TaskStorage
/TaskPool ~ [TaskStorage;N]
spawn task.AvailableTask
reuseTaskStorage
byclaim
to recycle andinitialize
to spawn new function.
Attempt of U-Level Embassy
- implement simple
CondVar
(axsync
module).- implement
FUTEX_WAIT_QUEUES
:
pub type Futex = AtomicU32 fn futex_wake(futex:&Futex); fn futex_wake_all(futex:&Futex); fn futex_wait(futex: &Futex, expected: u32, timeout: Option<Duration>) -> bool;
- implement S-level code:
fn wait_optional_timeout<'a, T>( &self, guard: MutexGuard<'a, T>, timeout: Option<Duration>, ) -> Option<MutexGuard<'a, T>> { let expected = self.futex.load(core::sync::atomic::Ordering::Relaxed); let mutex = guard.mutex(); let suc = futex_wait(&self.futex, expected, timeout); let new_guard = mutex.lock(); if !suc && timeout.is_some() { None } else { Some(new_guard) } }
- implement U-level, roughly same.
- implement
- implement
embassy_executor::Executor
with style same likefeatures = std
. Which useCondVar
asSignaler
.
There’s no timer logic here, so we can’t wait.
Timer Queue
[dependencies]
embassy-time-driver = { version = "...", features = ["tick-hz-1000"] } # Driver fixes embassy TICK_HZ
embassy-time-queue-utils = { version = "..." }
schedule_wake(at: u64, waker: &Waker)
: Adds a new timer event to the queue. It merges wakers if a timer for the same task already exists (to avoid multiple queue entries for one task). It returnstrue
if the newly added event becomes the earliest event in the queue, indicating operations by retrieving hardware timer’s time.poll(current_time: u64)
: This method is called by the driver’s interrupt handler. It iterates through the queue and identifies all timer events whose target time (at) is less than or equal tocurrent_time
. For each expired event, it callswaker.wake_by_ref()
. It removes expired events from the queue.next_expiration(current_time: u64) -> u64
: Returns the target time of the earliest event still remaining in the queue whose time is strictly greater thancurrent_time
. Returnsu64::MAX
or a similar indicator if the queue is empty or all events are in the past/current.
Workflow:
- An
async
task in your application calls anembassy-time
function likeDelay::delay_ms(1000).await
- The
embassy-time
calculates the absolute target wake-up time in embassy ticks (current_embassy_ticks + duration_in_embassy_ticks
). embassy-time
calls your driver’sDriver::schedule_wake(target_time, task_waker)
.- Your schedule_wake implementation (within a critical section) calls
queue.schedule_wake(target_time, task_waker)
. - If
queue.schedule_wake(...)
returnstrue
(because the new timer is the earliest), yourschedule_wake
gets the new earliest time from the queue using queue.next_expiration(self.now()
). It then calls hardware timer’s API (custom:set_alarm_at
) to set the next interrupt to fire at this time (converting the time from embassy ticks to hardware timer units usingaxconfig::devices::TIMER_FREQUENCY
). - Eventually, the hardware timer reaches the programmed value and triggers an interrupt,the handler gets the current time (
Driver::now()
). - The handler calls
queue.next_expiration(current_time)
. This iterates the queue, finds expired timers, and callswaker.wake_by_ref()
for each, marking the associated tasks as ready. - The
embassy-executor
, having been woken by the interrupt, polls the tasks that were just woken up. - The task that was waiting on the timer is polled, sees its waker was woken, and resumes execution past the
.await
point.
Embassy Executor
- embassy executor in os, should park to yield cpu until be waken due to specific event.
- axtask should wake specific future in specific task. Then executor will rerun to poll.
Executor Workflow:
- keeps a queue of tasks
- poll task sequentially until it blocks and yielding
- enqueue the block task and proceed to poll next
- repeat…
Interrupts Workflow:
- polled task will trigger peripherals to make a interruption
- interruption will be received by handler
- handler will update peripherals state
- handler will notify executor to poll the task.
Embassy Preempt
Thread: Owns a private stack Coroutine: Owns a shared stack
We don’t know which is real sync or which is async, we only know the poll reason and wether it has stack or not.
We follow the rule of the reason:
- await poll for no stack no matter task has or not. After await poll, it has no stack no matter previous.
- interrupt poll for stack no matter task has or not, if not, we make one. After interrupt poll, it has private stack no matter previous.
- Common
- Find highest prio task
- Set alarm call back
- await poll(cooperative)
- common
- poll the highest priority
- if the stack is in Thread mode or has its stack, drop and back to coroutine.
- run task until
pending
- update task state to
NotReady
- Seek next and repeat Stack will be reused after poll due to yielding actively.
- interupt poll(preemptive)
- common
- Here for stackless task, it will allocate a stack to mimic just as stack task.
Comment: We construct a “trampoline”, a temporary stack to unify the workflow between stack and stackless task. Which unify and simplify workflow for
PendSV
. IntCtxSW
/PendSV
:- await mode: stack/mimic-task task to take over stackless(shared stack) task yielding by
.await
. - interrupt mode: stackless(shared stack) is interrupted to be taken over by stack/mimic-stack task.
- switch to new task stack, update PSP ptr to new task.
- back to normal and execute.
- await mode: stack/mimic-task task to take over stackless(shared stack) task yielding by
- trigger PendSV Stack will be transformed to its own private stack.
Life Time:
- Sync:
- Encapsulated in future as pesudo async.
- Behavoir like a coroutine before interuption.
- After interupted, change the shared stack to its own private stack as Thread.
- No
.await
means it will continue as Thread as handling in interuption workflow.
- Async:
- Same as Sync.
- Behavoir as coroutine in
.await
before interuption. - After interupted, change the shared stack to its own private stack as Thread.
.await
means its private stack will be dropped and behavoir as coroutine.
Embassy Logic:
Initialize a
AvaialableTask
contains:TaskStorage
which containspoll_fn
andfuture
:fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { unsafe { self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); self.task.future.write_in_place(future); let task = TaskRef::new(self.task); SpawnToken::new(task) } }
Spawn token is nothing but a ptr to the raw task.
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> { let task = AvailableTask::claim(self); match task { Some(task) => task.initialize(future), None => SpawnToken::new_failed(), } }
Executor is:
#[repr(transparent)] pub struct Executor { pub(crate) inner: SyncExecutor, _not_sync: PhantomData<*mut ()>, }
Where inner will spawn, which enqueue by head to its
RunQueue
everyTaskRef
.__pender
will be the context of executor to notify event.poll every task by execute
poll_fn
in everyTaskRef
in dequeue process.