engine/collections/
channel.rs

1// SPDX-FileCopyrightText: 2025 Jens Pitkänen <jens.pitkanen@helsinki.fi>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4
5use core::{
6    mem::{transmute, MaybeUninit},
7    sync::atomic::AtomicUsize,
8};
9
10use platform::{
11    channel::{channel_from_parts, CachePadded, Receiver, Sender, SyncUnsafeCell},
12    Platform,
13};
14
15use crate::allocators::LinearAllocator;
16
17/// Creates a single-producer single-consumer channel.
18pub fn channel<T: Sync>(
19    platform: &dyn Platform,
20    allocator: &'static LinearAllocator,
21    capacity: usize,
22) -> Option<(Sender<T>, Receiver<T>)> {
23    type ChannelSlot<T> = SyncUnsafeCell<CachePadded<Option<T>>>;
24
25    // +1 to capacity since we're using the last slot as the difference between empty and full.
26    let queue = allocator.try_alloc_uninit_slice::<ChannelSlot<T>>(capacity + 1, None)?;
27    for slot in &mut *queue {
28        slot.write(SyncUnsafeCell::new(CachePadded::new(None)));
29    }
30    // Safety: all the values are initialized above.
31    let queue =
32        unsafe { transmute::<&mut [MaybeUninit<ChannelSlot<T>>], &mut [ChannelSlot<T>]>(queue) };
33
34    let offsets = allocator.try_alloc_uninit_slice::<AtomicUsize>(2, None)?;
35    for offset in &mut *offsets {
36        offset.write(AtomicUsize::new(0));
37    }
38    // Safety: all the values are initialized above.
39    let offsets =
40        unsafe { transmute::<&mut [MaybeUninit<AtomicUsize>], &mut [AtomicUsize]>(offsets) };
41    let (read, write) = offsets.split_at_mut(1);
42
43    let semaphore = allocator.try_alloc_uninit_slice(1, None)?;
44    let semaphore = semaphore[0].write(platform.create_semaphore());
45
46    Some(channel_from_parts(
47        queue,
48        &mut read[0],
49        &mut write[0],
50        semaphore,
51    ))
52}