IPC

Philosophy

Separates the object instances (which are process-specific) from the data they manage (which is shared). This design allows each process to have its own interface objects while manipulating the same underlying shared memory structures through atomic operations.

Each process initiates the different object instances with the same memory data. We should use the spin, atomic and any inter-process synchronization to ensure this.

  • No Ownership: The handle doesn’t own the shared data
  • Pointer-Based: Use NonNull<T> to reference shared memory
  • Safe Drop: Only cleanup process-local resources, never touch shared memory
  • Multiple Handles: Many processes can have handles to the same queue data

Memory Layout

Refer Claude

SHARED MEMORY LAYOUT:
┌─────────────────────────────────────────────────────────────────────┐
│                    SHARED MEMORY REGION                              │
├─────────────────────────────────────────────────────────────────────┤
[General Metadata] <- Ipc related informations            												  │
├─────────────────────────────────────────────────────────────────────┤
[AllocatorMetadata] <- Allocator control structures              │
├─────────────────────────────────────────────────────────────────────┤
[BufferRegistry] <- Maps buffer names to locations                   │ 
├─────────────────────────────────────────────────────────────────────┤
│ ┌─[Buffer1: "events"]──────────────┐ <- Allocated by TLSF           │
│ │ [SharedBufferData][CircularData] │                                 │
│ └─────────────────────────────────┘                                 │
│ ┌─[Buffer2: "control"]─────────────┐                                 │  
│ │ [SharedBufferData][CircularData] │                                 │
│ └─────────────────────────────────┘                                 │
│ ┌─[Buffer3: "data_stream"]─────────┐                                 │
│ │ [SharedBufferData][CircularData] │                                 │
│ └─────────────────────────────────┘                                 │
├─────────────────────────────────────────────────────────────────────┤
│                    FREE SPACE (managed by TLSF)└─────────────────────────────────────────────────────────────────────┘
Shared Memory:
┌─[SharedQueueData<T>]──────────────────┐
│ head: AtomicUsize                        │
│ tail: AtomicUsize                        │ ← NEVER DROPPED
│ capacity: 1000[ring buffer data: T; 1000]└─────────────────────────────────────────┘
           ↑              ↑              ↑
    [Process A]    [Process B]    [Process C]
    PersistentQueue PersistentQueue PersistentQueue
    (handle only)   (handle only)   (handle only)
         ↓              ↓              ↓
    Safe to drop   Safe to drop   Safe to drop

All things should be initiated per process with consistency so we won’t lose information while retain the functionality across.


Refer Claude

We have BufferRegistry contains BufferRegistryEntry to store the concrete offset for Queue.

#[repr(C)]
#[derive(Clone, Copy)]
struct BufferRegistryEntry {
    name: [u8; MAX_BUFFER_NAME_LEN],    // Buffer name (null-terminated)
    buffer_offset: usize,               // Offset from shared memory base
    buffer_size: usize,                 // Total buffer size
    created_by_pid: u32,                // Process that created it
    ref_count: AtomicU32,               // Number of processes using it
    is_active: AtomicU32,               // 1 = active, 0 = deleted
}

impl BufferRegistryEntry {
    fn new() -> Self {
        Self {
            name: [0; MAX_BUFFER_NAME_LEN],
            buffer_offset: 0,
            buffer_size: 0,
            created_by_pid: 0,
            ref_count: AtomicU32::new(0),
            is_active: AtomicU32::new(0),
        }
    }
    
    fn set_name(&mut self, name: &str) {}
    
    fn get_name(&self) -> String {}
}

/// Registry of all buffers - lives in shared memory right after TLSF metadata
#[repr(C, align(64))]
pub struct SharedBufferRegistry {
    magic: u32,
    entry_count: AtomicUsize,
    entries: [BufferRegistryEntry; MAX_BUFFERS],
}

impl SharedBufferRegistry {
    const MAGIC: u32 = 0x42554652; // "BUFR"
    
    pub fn initialize(&mut self) {
        self.magic = Self::MAGIC;
        self.entry_count.store(0, Ordering::Relaxed);
        
        // Initialize all entries
        for entry in &mut self.entries {
            *entry = BufferRegistryEntry::new();
        }
    }
    
    pub fn is_initialized(&self) -> bool {
        self.magic == Self::MAGIC
    }
    
    /// Register a new buffer
    pub fn register_buffer(&mut self, name: &str, offset: usize, size: usize, pid: u32) -> Result<(), BufferError> {
        // Check if buffer already exists
        if self.find_buffer(name).is_some() {
            return Err(BufferError::AlreadyExists);
        }
        
        // Find empty slot
        for entry in &mut self.entries {
            if entry.is_active.load(Ordering::Acquire) == 0 {
                entry.set_name(name);
                entry.buffer_offset = offset;
                entry.buffer_size = size;
                entry.created_by_pid = pid;
                entry.ref_count.store(1, Ordering::Relaxed);
                entry.is_active.store(1, Ordering::Release);
                
                self.entry_count.fetch_add(1, Ordering::Relaxed);
                return Ok(());
            }
        }
        
        Err(BufferError::RegistryFull)
    }
    
    /// Find existing buffer by name
    pub fn find_buffer(&self, name: &str) -> Option<(usize, usize)> {
        for entry in &self.entries {
            if entry.is_active.load(Ordering::Acquire) == 1 && entry.get_name() == name {
                entry.ref_count.fetch_add(1, Ordering::Relaxed);
                return Some((entry.buffer_offset, entry.buffer_size));
            }
        }
        None
    }
    
    /// List all active buffers
    pub fn list_buffers(&self) -> Vec<String> {
        let mut buffers = Vec::new();
        for entry in &self.entries {
            if entry.is_active.load(Ordering::Acquire) == 1 {
                buffers.push(entry.get_name());
            }
        }
        buffers
    }
}

The CommunicationManager should contains the layout of memory and memory base.

/// Master communication manager that coordinates allocator and buffers
pub struct CommunicationManager {
    // Process-specific allocator instance
    allocator: crate::PersistentTlsfAllocator,
    
    // Buffer registry (shared)
    registry: NonNull<SharedBufferRegistry>,
    
    // Process-specific buffer handles cache
    active_buffers: HashMap<String, BufferHandle>,
    
    // Process identification
    process_id: u32,
    shared_memory_base: *mut u8,
}
// shm-on data
pub struct LfQueue<T,N> {}

pub struct SQHandle {
    q: LfQueue,
    // other field...
}

#[repr(C, align(64))]
pub struct SharedQueueData<T> {
    // Queue metadata
    capacity: usize,
    element_size: usize,
    
    // Atomic indices for lock-free operation
    head: AtomicUsize,              // Consumer index
    tail: AtomicUsize,              // Producer index
    
    // Statistics and health tracking
    total_enqueues: AtomicUsize,
    total_dequeues: AtomicUsize,
    overruns: AtomicUsize,          // Failed enqueues due to full queue
    
    // Process tracking for debugging
    producer_pid: AtomicU32,        // Last producer PID
    consumer_pid: AtomicU32,        // Last consumer PID
    
    // Initialization state
    magic: u32,
    is_initialized: AtomicU32,
    
    // The actual ring buffer data follows this struct in memory
    // We don't store it as a field because it's variable-sized
    _phantom: PhantomData<T>,
}

impl<T> SharedQueueData<T> {
    const MAGIC: u32 = 0x51554555; // "QUEU"
    
    /// Initialize the shared queue data (called once)
    pub unsafe fn initialize(&mut self, capacity: usize) {
        self.magic = Self::MAGIC;
        self.capacity = capacity;
        self.element_size = size_of::<T>();
        
        // Initialize atomic indices
        self.head.store(0, Ordering::Relaxed);
        self.tail.store(0, Ordering::Relaxed);
        
        // Initialize statistics
        self.total_enqueues.store(0, Ordering::Relaxed);
        self.total_dequeues.store(0, Ordering::Relaxed);
        self.overruns.store(0, Ordering::Relaxed);
        
        self.producer_pid.store(0, Ordering::Relaxed);
        self.consumer_pid.store(0, Ordering::Relaxed);
        
        // Zero out the ring buffer data
        let data_ptr = self.data_ptr();
        core::ptr::write_bytes(data_ptr, 0, capacity * size_of::<T>());
        
        // Mark as initialized (release ordering ensures all writes above are visible)
        self.is_initialized.store(1, Ordering::Release);
    }
    
    /// Check if queue is properly initialized
    pub fn is_ready(&self) -> bool {
        self.magic == Self::MAGIC && 
        self.is_initialized.load(Ordering::Acquire) == 1
    }
    
    /// Get pointer to the ring buffer data
    pub fn data_ptr(&self) -> *mut T {
        unsafe {
            (self as *const _ as *mut u8)
                .add(size_of::<SharedQueueData<T>>())
                .cast::<T>()
        }
    }
    
    /// Calculate total size needed for this queue
    pub fn total_size(capacity: usize) -> usize {
        size_of::<SharedQueueData<T>>() + (capacity * size_of::<T>())
    }
    
    /// Get current queue length (approximate - may be stale)
    pub fn len(&self) -> usize {
        let tail = self.tail.load(Ordering::Acquire);
        let head = self.head.load(Ordering::Acquire);
        
        if tail >= head {
            tail - head
        } else {
            self.capacity - (head - tail)
        }
    }
    
    /// Check if queue is empty (approximate)
    pub fn is_empty(&self) -> bool {
        self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
    }
    
    /// Check if queue is full (approximate) 
    pub fn is_full(&self) -> bool {
        let tail = self.tail.load(Ordering::Acquire);
        let head = self.head.load(Ordering::Acquire);
        let next_tail = (tail + 1) % self.capacity;
        next_tail == head
    }
}

// CRITICAL: No Drop implementation for SharedQueueData
// This ensures the data persists even when process-local handles are dropped

//==============================================================================
// PROCESS-LOCAL QUEUE HANDLES (These CAN be dropped safely)
//==============================================================================

#[derive(Debug)]
pub enum QueueError {
    NotInitialized,
    Full,
    Empty,
    CorruptedData,
}

/// Process-local handle to a persistent queue
/// This is what gets created/dropped by user code - it's safe to drop
pub struct PersistentQueue<T> (
    NonNull<SharedQueueData<T>>,
    // Note: No ownership of the actual queue data
)

// NonNull<LfQueue<T,N>>
// ShmPtr<LfQueue<T,N>>
// Persist<LfQueue<T,N>>
use core::sync::atomic::{AtomicUsize, AtomicU32, Ordering};
use core::ptr::NonNull;
use core::mem::{size_of, align_of};
use std::collections::HashMap;

/// UNIFIED ALLOCATION INTERFACE DESIGN
/// 
/// The interface handles three key flows:
/// 1. USER → QUEUE: User sends data, allocator provides space, queue manages transfer
/// 2. QUEUE → USER: Queue notifies of data, allocator manages space, user receives
/// 3. ALLOCATOR ↔ QUEUE: Coordination for memory management and cleanup
///
/// Key Philosophy: 
/// - User sees simple send/receive interface
/// - Queue handles message routing and ordering  
/// - Allocator manages underlying memory lifecycle
/// - All three coordinate seamlessly for optimal performance

//==============================================================================
// UNIFIED COMMUNICATION INTERFACE
//==============================================================================

#[derive(Debug)]
pub enum CommunicationError {
    AllocationFailed,
    QueueFull,
    QueueEmpty,
    InvalidMessage,
    ChannelNotFound,
    CorruptedData,
    InsufficientSpace,
}

/// Main interface for queue-based communication with integrated allocation
pub struct UnifiedCommunicationSystem {
    /// TLSF allocator for memory management
    allocator: crate::PersistentTlsfAllocator,
    
    /// Message queues by channel name
    queues: HashMap<String, crate::PersistentQueue<MessageDescriptor>>,
    
    /// Queue factory for creating new queues
    queue_factory: crate::QueueFactory<crate::PersistentTlsfAllocator>,
    
    /// Process identification
    process_id: u32,
    sequence_counter: u64,
    
    /// Shared memory base for pointer calculations
    shared_memory_base: *mut u8,
    
    /// Statistics
    messages_sent: usize,
    messages_received: usize,
    bytes_allocated: usize,
}

impl UnifiedCommunicationSystem {
    /// Initialize the communication system
    pub unsafe fn new(
        shared_memory: *mut u8,
        total_size: usize,
        process_id: u32
    ) -> Result<Self, CommunicationError> {
        
        // Initialize TLSF allocator
        let allocator = crate::PersistentTlsfAllocator::create_or_attach(
            shared_memory,
            total_size,
            process_id
        ).map_err(|_| CommunicationError::AllocationFailed)?;
        
        let queue_factory = crate::QueueFactory::new(allocator.clone(), process_id);
        
        Ok(UnifiedCommunicationSystem {
            allocator,
            queues: HashMap::new(),
            queue_factory,
            process_id,
            sequence_counter: 1,
            shared_memory_base: shared_memory,
            messages_sent: 0,
            messages_received: 0,
            bytes_allocated: 0,
        })
    }
    
    //==========================================================================
    // HIGH-LEVEL USER INTERFACE
    //==========================================================================
    
    /// Send data to a channel (allocates + enqueues)
    /// This is the main user-facing send interface
    pub fn send<T: Clone>(&mut self, channel: &str, message_type: u32, data: &T) -> Result<(), CommunicationError> {
        // 1. Ensure channel exists
        self.ensure_channel_exists(channel)?;
        
        // 2. Allocate space for the data
        let data_size = size_of::<T>();
        let data_ptr = self.allocate_data(data_size)?;
        
        // 3. Write data to shared memory
        unsafe {
            core::ptr::write(data_ptr.cast::<T>().as_ptr(), data.clone());
        }
        
        // 4. Create message descriptor
        let data_offset = unsafe {
            data_ptr.as_ptr() as usize - self.shared_memory_base as usize
        };
        
        let descriptor = MessageDescriptor::new(
            data_offset,
            data_size,
            message_type,
            self.process_id,
            self.sequence_counter,
        );
        
        // 5. Enqueue the descriptor
        let queue = self.queues.get(channel).unwrap();
        queue.enqueue(descriptor).map_err(|_| CommunicationError::QueueFull)?;
        
        // 6. Update state
        self.sequence_counter += 1;
        self.messages_sent += 1;
        self.bytes_allocated += data_size;
        
        Ok(())
    }
    
    /// Send raw bytes to a channel
    pub fn send_bytes(&mut self, channel: &str, message_type: u32, data: &[u8]) -> Result<(), CommunicationError> {
        // 1. Ensure channel exists
        self.ensure_channel_exists(channel)?;
        
        // 2. Allocate space
        let data_ptr = self.allocate_data(data.len())?;
        
        // 3. Copy data
        unsafe {
            core::ptr::copy_nonoverlapping(
                data.as_ptr(),
                data_ptr.as_ptr(),
                data.len()
            );
        }
        
        // 4. Create and enqueue descriptor
        let data_offset = unsafe {
            data_ptr.as_ptr() as usize - self.shared_memory_base as usize
        };
        
        let descriptor = MessageDescriptor::new(
            data_offset,
            data.len(),
            message_type,
            self.process_id,
            self.sequence_counter,
        );
        
        let queue = self.queues.get(channel).unwrap();
        queue.enqueue(descriptor).map_err(|_| CommunicationError::QueueFull)?;
        
        self.sequence_counter += 1;
        self.messages_sent += 1;
        self.bytes_allocated += data.len();
        
        Ok(())
    }
    
    /// Receive data from a channel (dequeues + provides access)
    /// Returns a handle that automatically manages memory lifecycle
    pub fn receive(&mut self, channel: &str) -> Result<MessageHandle, CommunicationError> {
        let queue = self.queues.get(channel)
            .ok_or(CommunicationError::ChannelNotFound)?;
        
        // Dequeue message descriptor
        let descriptor = queue.dequeue()
            .map_err(|_| CommunicationError::QueueEmpty)?;
        
        self.messages_received += 1;
        
        // Create handle for accessing the data
        Ok(MessageHandle::new(descriptor, self.shared_memory_base, &mut self.allocator))
    }
    
    /// Try to receive without blocking
    pub fn try_receive(&mut self, channel: &str) -> Result<MessageHandle, CommunicationError> {
        self.receive(channel) // In this implementation, receive is already non-blocking
    }
    
    /// Send with zero-copy optimization (advanced interface)
    pub fn send_zero_copy(&mut self, channel: &str, message_type: u32, size: usize) 
        -> Result<ZeroCopyWriter, CommunicationError> {
        
        self.ensure_channel_exists(channel)?;
        
        // Allocate space but don't write yet
        let data_ptr = self.allocate_data(size)?;
        
        let data_offset = unsafe {
            data_ptr.as_ptr() as usize - self.shared_memory_base as usize
        };
        
        Ok(ZeroCopyWriter::new(
            channel.to_string(),
            data_ptr,
            data_offset,
            size,
            message_type,
            self.process_id,
            self.sequence_counter,
        ))
    }
    
    //==========================================================================
    // CHANNEL MANAGEMENT
    //==========================================================================
    
    /// Create or get existing channel
    pub fn create_channel(&mut self, name: &str, queue_capacity: usize) -> Result<(), CommunicationError> {
        if self.queues.contains_key(name) {
            return Ok(()); // Already exists
        }
        
        let queue = self.queue_factory.create_queue::<MessageDescriptor>(queue_capacity)
            .map_err(|_| CommunicationError::AllocationFailed)?;
        
        self.queues.insert(name.to_string(), queue);
        Ok(())
    }
    
    /// Connect to existing channel (created by another process)
    pub fn connect_channel(&mut self, name: &str, queue_ptr: NonNull<crate::SharedQueueData<MessageDescriptor>>) 
        -> Result<(), CommunicationError> {
        
        if self.queues.contains_key(name) {
            return Ok(());
        }
        
        let queue = self.queue_factory.connect_to_queue(queue_ptr)
            .map_err(|_| CommunicationError::AllocationFailed)?;
        
        self.queues.insert(name.to_string(), queue);
        Ok(())
    }
    
    /// List all available channels
    pub fn list_channels(&self) -> Vec<String> {
        self.queues.keys().cloned().collect()
    }
    
    //==========================================================================
    // INTERNAL HELPERS
    //==========================================================================
    
    fn ensure_channel_exists(&mut self, channel: &str) -> Result<(), CommunicationError> {
        if !self.queues.contains_key(channel) {
            // Create with default capacity
            self.create_channel(channel, 1000)?;
        }
        Ok(())
    }
    
    fn allocate_data(&mut self, size: usize) -> Result<NonNull<u8>, CommunicationError> {
        let layout = core::alloc::Layout::from_size_align(size, 8)
            .map_err(|_| CommunicationError::AllocationFailed)?;
        
        self.allocator.allocate(layout)
            .map_err(|_| CommunicationError::AllocationFailed)
    }
}


//==============================================================================
// MESSAGE TYPES AND DATA STRUCTURES
//==============================================================================
/// Message metadata that travels through queues
/// This is what gets enqueued/dequeued - contains reference to actual data
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct MessageDescriptor {
    /// Offset from shared memory base to actual data
    data_offset: usize,
    
    /// Size of the allocated data block
    data_size: usize,
    
    /// Message type for application-level routing
    message_type: u32,
    
    /// Source process ID
    source_pid: u32,
    
    /// Sequence number for ordering
    sequence: u64,
    
    /// Timestamp for timeouts and debugging
    timestamp: u64,
    
    /// Reference count - tracks how many readers need this data
    ref_count: AtomicU32,
    
    /// Checksum for data integrity
    checksum: u32,
}

impl MessageDescriptor {
    /// Create new message descriptor
    pub fn new(data_offset: usize, data_size: usize, message_type: u32, 
               source_pid: u32, sequence: u64) -> Self {
        Self {
            data_offset,
            data_size,
            message_type,
            source_pid,
            sequence,
            timestamp: current_timestamp(),
            ref_count: AtomicU32::new(1),
            checksum: 0, // Will be computed later
        }
    }
    
    /// Get pointer to actual data in shared memory
    pub unsafe fn data_ptr(&self, shared_memory_base: *mut u8) -> *mut u8 {
        shared_memory_base.add(self.data_offset)
    }
    
    /// Increment reference count (for multi-reader scenarios)
    pub fn add_ref(&self) -> u32 {
        self.ref_count.fetch_add(1, Ordering::AcqRel) + 1
    }
    
    /// Decrement reference count, returns true if should be deallocated
    pub fn release_ref(&self) -> bool {
        self.ref_count.fetch_sub(1, Ordering::AcqRel) == 1
    }
}

//==============================================================================
// MESSAGE HANDLE - MANAGES DATA LIFECYCLE
//==============================================================================

/// Handle for received messages - automatically manages memory lifecycle
pub struct MessageHandle {
    descriptor: MessageDescriptor,
    shared_memory_base: *mut u8,
    allocator: *mut crate::PersistentTlsfAllocator, // For cleanup on drop
}

impl MessageHandle {
    fn new(
        descriptor: MessageDescriptor,
        shared_memory_base: *mut u8,
        allocator: &mut crate::PersistentTlsfAllocator,
    ) -> Self {
        Self {
            descriptor,
            shared_memory_base,
            allocator: allocator as *mut _,
        }
    }
    
    /// Get message metadata
    pub fn metadata(&self) -> &MessageDescriptor {
        &self.descriptor
    }
    
    /// Get message type
    pub fn message_type(&self) -> u32 {
        self.descriptor.message_type
    }
    
    /// Get source process ID
    pub fn source_pid(&self) -> u32 {
        self.descriptor.source_pid
    }
    
    /// Get data size
    pub fn data_size(&self) -> usize {
        self.descriptor.data_size
    }
    
    /// Read data as specific type
    pub fn read_as<T>(&self) -> Result<&T, CommunicationError> {
        if size_of::<T>() != self.descriptor.data_size {
            return Err(CommunicationError::InvalidMessage);
        }
        
        unsafe {
            let data_ptr = self.descriptor.data_ptr(self.shared_memory_base);
            Ok(&*(data_ptr as *const T))
        }
    }
    
    /// Read data as byte slice
    pub fn read_bytes(&self) -> &[u8] {
        unsafe {
            let data_ptr = self.descriptor.data_ptr(self.shared_memory_base);
            core::slice::from_raw_parts(data_ptr, self.descriptor.data_size)
        }
    }
    
    /// Clone the data out (for owned access)
    pub fn clone_data<T: Clone>(&self) -> Result<T, CommunicationError> {
        Ok(self.read_as::<T>()?.clone())
    }
    
    /// Keep the message for longer (increments reference count)
    pub fn keep_alive(&self) -> MessageHandle {
        self.descriptor.add_ref();
        MessageHandle {
            descriptor: self.descriptor,
            shared_memory_base: self.shared_memory_base,
            allocator: self.allocator,
        }
    }
}

impl Drop for MessageHandle {
    fn drop(&mut self) {
        // Decrement reference count and deallocate if last reference
        if self.descriptor.release_ref() {
            unsafe {
                let data_ptr = NonNull::new_unchecked(
                    self.descriptor.data_ptr(self.shared_memory_base)
                );
                
                if let Some(allocator) = self.allocator.as_mut() {
                    let _ = allocator.deallocate(data_ptr);
                }
            }
        }
    }
}

//==============================================================================
// ZERO-COPY WRITER
//==============================================================================

/// Zero-copy writer for high-performance scenarios
pub struct ZeroCopyWriter {
    channel: String,
    data_ptr: NonNull<u8>,
    data_offset: usize,
    size: usize,
    message_type: u32,
    source_pid: u32,
    sequence: u64,
    committed: bool,
}

impl ZeroCopyWriter {
    fn new(
        channel: String,
        data_ptr: NonNull<u8>,
        data_offset: usize,
        size: usize,
        message_type: u32,
        source_pid: u32,
        sequence: u64,
    ) -> Self {
        Self {
            channel,
            data_ptr,
            data_offset,
            size,
            message_type,
            source_pid,
            sequence,
            committed: false,
        }
    }
    
    /// Get mutable slice for direct writing
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
        unsafe {
            core::slice::from_raw_parts_mut(self.data_ptr.as_ptr(), self.size)
        }
    }
    
    /// Write typed data directly
    pub fn write<T>(&mut self, data: &T) -> Result<(), CommunicationError> {
        if size_of::<T>() != self.size {
            return Err(CommunicationError::InvalidMessage);
        }
        
        unsafe {
            core::ptr::write(self.data_ptr.as_ptr() as *mut T, *data);
        }
        
        Ok(())
    }
    
    /// Commit the write (enqueues the message)
    pub fn commit(mut self, comm_system: &mut UnifiedCommunicationSystem) -> Result<(), CommunicationError> {
        let descriptor = MessageDescriptor::new(
            self.data_offset,
            self.size,
            self.message_type,
            self.source_pid,
            self.sequence,
        );
        
        let queue = comm_system.queues.get(&self.channel)
            .ok_or(CommunicationError::ChannelNotFound)?;
        
        queue.enqueue(descriptor)
            .map_err(|_| CommunicationError::QueueFull)?;
        
        self.committed = true;
        comm_system.sequence_counter += 1;
        comm_system.messages_sent += 1;
        
        Ok(())
    }
}

impl Drop for ZeroCopyWriter {
    fn drop(&mut self) {
        if !self.committed {
            // TODO: Deallocate the reserved space since it wasn't used
            // This requires access to the allocator, which we'd need to store
        }
    }
}

//==============================================================================
// USAGE EXAMPLES
//==============================================================================

/// Example: Basic send/receive
pub fn example_basic_usage() -> Result<(), CommunicationError> {
    let shared_memory = unsafe { allocate_shared_memory(1024 * 1024) };
    
    // Initialize communication system
    let mut comm = unsafe {
        UnifiedCommunicationSystem::new(shared_memory, 1024 * 1024, std::process::id())?
    };
    
    // Create a channel
    comm.create_channel("events", 1000)?;
    
    // Send some data
    let event_data = "System started";
    comm.send_bytes("events", 1, event_data.as_bytes())?;
    
    // Send structured data
    #[derive(Clone)]
    struct StatusUpdate {
        cpu_usage: f32,
        memory_usage: f32,
        timestamp: u64,
    }
    
    let status = StatusUpdate {
        cpu_usage: 45.2,
        memory_usage: 78.1,
        timestamp: 1234567890,
    };
    
    comm.send("events", 2, &status)?;
    
    // Receive data
    let message = comm.receive("events")?;
    match message.message_type() {
        1 => {
            let text = String::from_utf8_lossy(message.read_bytes());
            println!("Received text: {}", text);
        },
        2 => {
            let status = message.read_as::<StatusUpdate>()?;
            println!("CPU: {}%, Memory: {}%", status.cpu_usage, status.memory_usage);
        },
        _ => println!("Unknown message type"),
    }
    
    Ok(())
}

/// Example: Zero-copy high-performance writing
pub fn example_zero_copy() -> Result<(), CommunicationError> {
    let shared_memory = unsafe { allocate_shared_memory(1024 * 1024) };
    let mut comm = unsafe {
        UnifiedCommunicationSystem::new(shared_memory, 1024 * 1024, std::process::id())?
    };
    
    comm.create_channel("bulk_data", 100)?;
    
    // Zero-copy write large data
    let mut writer = comm.send_zero_copy("bulk_data", 10, 4096)?;
    let buffer = writer.as_mut_slice();
    
    // Fill buffer directly (no intermediate copies)
    for (i, byte) in buffer.iter_mut().enumerate() {
        *byte = (i % 256) as u8;
    }
    
    // Commit to queue
    writer.commit(&mut comm)?;
    
    Ok(())
}

//==============================================================================
// UTILITY FUNCTIONS
//==============================================================================

unsafe fn allocate_shared_memory(size: usize) -> *mut u8 {
    // Platform-specific shared memory allocation
    std::alloc::alloc(std::alloc::Layout::from_size_align(size, 4096).unwrap())
}

fn current_timestamp() -> u64 {
    // Platform-specific timestamp
    0 // Placeholder
}

To design the initiation of shared memory, we should take a backend.

pub trait AddrSpec {
    type Addr: MemoryAddr;
    type Flags: Copy;
}

pub trait Map<S: AddrSpec>: Sized {
    type Config;
    type Error: core::fmt::Debug;

    fn map(
        self,
        start: Option<S::Addr>,
        size: usize,
        flags: S::Flags,
        cfg: Self::Config,
    ) -> Result<Area<S, Self>, Self::Error>;
    fn unmap(area: &mut Area<S, Self>) -> Result<(), Self::Error>;
}

pub struct Area<S: AddrSpec, M: Map<S>> {
    va_range: AddrRange<S::Addr>,
    flags: S::Flags,
    bk: M,
}

To adapt the connection logic, one should take once at a time for a connection instance with fixed resolver like name. Then

pub trait ShmInit {
    fn try_claim() -> bool {}
}

pub struct Ipc<T: ShmInit + ...> {
    // Shared memory information
    area: Area<M>

    // Components
    ipc: T
}

impl<T: ShmInit> Ipc<T> {
        /// Initialize as server - this does the SINGLE mmap()
    fn initialize_server(&mut self, name: &str, size: usize) -> Result<(), Self::Error> {
        // 1. Create shared memory (SINGLE mmap call)
        let (shared_memory_base, actual_size) = create_shared_memory(name, size)?;
        
        // 2. Initialize the shared memory structures
        let server = Ipc::new(shared_memory_base, actual_size, name)?;
        
        Ok(())
    }
    
    /// Connect as client - NO mmap(), connects to server instead
    fn connect_client(&mut self, name: &str) -> Result<(), Self::Error> {
        // 1. Connect to server process (not shared memory directly)
        let server_connection = connect_to(name)?;

        // init client same...
        self.attach(sever_connection);
        
        Ok(())
    }

    pub fn attach(&mut self, name: &str, size: usize) -> Result<(), InitError> {
        // 2. Get coordinator (always at offset 0)
        let coord_ptr = shared_memory_base as *mut SharedMemoryCoordinator;
        let coord = unsafe { NonNull::new_unchecked(coordinator_ptr) };
        self.coord = Some(coord);
        
        let coord_ref = unsafe { coordinator.as_mut() };
        coord_ref.try_claim()?;

        // acquire possible local handle...
        
        Ok(())
    }
}