platform/
channel.rs

1// SPDX-FileCopyrightText: 2025 Jens Pitkänen <jens.pitkanen@helsinki.fi>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4
5//! Static memory based single-producer single-consumer channel for
6//! communication between threads.
7
8use core::sync::atomic::{AtomicUsize, Ordering};
9
10pub use crossbeam_utils::CachePadded;
11pub use sync_unsafe_cell::SyncUnsafeCell;
12
13use crate::Semaphore;
14
15struct SharedChannelState<T: 'static + Sync> {
16    /// The slice containing the actual elements.
17    queue: &'static [SyncUnsafeCell<CachePadded<Option<T>>>],
18    /// The index to `queue` where the oldest pushed element is. Only mutated by
19    /// [`Receiver::try_recv`]. If `read_offset == write_offset`, the queue
20    /// should be considered empty.
21    read_offset: &'static AtomicUsize,
22    /// The index to `queue` where the next element is pushed. Only mutated by
23    /// [`Sender::send`]. If `write_offset + 1 == read_offset`, the queue is
24    /// considered full. Since writes need to happen before reads, this happens
25    /// when the writes wrap around to the start and reach the read offset.
26    write_offset: &'static AtomicUsize,
27    /// Incremented on every [`Sender::send`], decrement on in every
28    /// [`Receiver::recv`] and [`Receiver::try_recv`].
29    write_semaphore: &'static Semaphore,
30}
31
32/// Return type of [`channel_from_parts`].
33pub type Channel<T> = (Sender<T>, Receiver<T>);
34
35/// Creates a new channel from its raw parts.
36///
37/// The queue should be one longer than the actual bound of the queue, so a
38/// channel with room for 3 buffered elements should have a slice of length 4 as
39/// the `queue`.
40pub fn channel_from_parts<T: Sync>(
41    queue: &'static mut [SyncUnsafeCell<CachePadded<Option<T>>>],
42    write_offset: &'static mut AtomicUsize,
43    read_offset: &'static mut AtomicUsize,
44    write_semaphore: &'static mut Semaphore,
45) -> Channel<T> {
46    read_offset.store(0, Ordering::Release);
47    write_offset.store(0, Ordering::Release);
48    let sender = Sender {
49        ch: SharedChannelState {
50            queue,
51            read_offset,
52            write_offset,
53            write_semaphore,
54        },
55    };
56    let receiver = Receiver {
57        ch: SharedChannelState {
58            queue,
59            read_offset,
60            write_offset,
61            write_semaphore,
62        },
63    };
64    (sender, receiver)
65}
66
67/// One endpoint of a channel, which can be used to send [`Sync`] and `'static`
68/// values to another thread.
69pub struct Sender<T: 'static + Sync> {
70    ch: SharedChannelState<T>,
71}
72
73impl<T: Sync> Sender<T> {
74    /// Returns how many tasks could be sent to be buffered up before the other
75    /// side receives them.
76    pub fn capacity(&self) -> usize {
77        self.ch.queue.len() - 1
78    }
79
80    /// Sends the value into the channel if there's room.
81    pub fn send(&mut self, value: T) -> Result<(), T> {
82        if self.ch.queue.len() <= 1 {
83            // This channel does not have any capacity, always fail.
84            return Err(value);
85        }
86
87        // 1. Acquire-load the read offset, so we know the offset is either what
88        //    we get here or something higher (if the other side `pop`s during
89        //    this function). In either case, we're good as long as we don't
90        //    write past the offset we read here.
91        let read_offset = self.ch.read_offset.load(Ordering::Acquire);
92
93        // 2. Since this is a single-producer channel (and thus we have a mut
94        //    self), we can acquire-load here and release-store later in this
95        //    function and rest assured that the value of write_offset does not
96        //    change in between.
97        let write_offset = self.ch.write_offset.load(Ordering::Acquire);
98
99        let next_write_offset = (write_offset + 1) % self.ch.queue.len();
100
101        // See if the queue is full. Note that the comparison isn't ">=" because
102        // that would cause issues with wrapping. This is still equivalent to
103        // ">=" if we didn't wrap (which is what we want), because:
104        // 1. The read offset is either the value stored in read_offset or
105        //    greater. This means that the value of read_offset never goes
106        //    *down* between push_back calls.
107        // 2. The write offset is always only incremented by 1 per push_back.
108        // => next_write_offset must reach read_offset before becoming greater
109        //    than it. Since write_offset is not incremented in this branch,
110        //    next_write_offset will never actually go past read_offset.
111        if next_write_offset == read_offset {
112            // The queue is full.
113            return Err(value);
114        }
115
116        // 3. Write the value.
117        {
118            let slot_ptr = self.ch.queue[write_offset].get();
119            // Safety: this specific index of the queue is not currently being
120            // read by the Receiver nor written by the Sender:
121            // - The Receiver does not read if read_offset == write_offset, and
122            //   since only Sender mutates write_offset and we have a mutable
123            //   borrow of self, we know Receiver is definitely not reading from
124            //   this index at this time. (After step 4 below, it can read this
125            //   value. Note that we drop this borrow before that step.)
126            // - We have a mutable borrow of this Sender, and there can only be
127            //   one Sender per write_offset, so we're definitely the only
128            //   Sender trying to access this queue.
129            let slot = unsafe { &mut *slot_ptr };
130            assert!(slot.is_none(), "slot should not be populated since the write offset should never go past the read offset");
131            *slot = CachePadded::new(Some(value));
132        }
133
134        // 4. Update the write offset, making the written value visible to the
135        //    receiver.
136        self.ch
137            .write_offset
138            .store(next_write_offset, Ordering::Release);
139
140        self.ch.write_semaphore.increment();
141
142        Ok(())
143    }
144}
145
146/// One endpoint of a channel, which can be used to receive [`Sync`] and
147/// `'static` values from another thread.
148pub struct Receiver<T: 'static + Sync> {
149    ch: SharedChannelState<T>,
150}
151
152impl<T: Sync> Receiver<T> {
153    /// Returns how many tasks could be buffered up to be received without any
154    /// recv calls.
155    pub fn capacity(&self) -> usize {
156        self.ch.queue.len() - 1
157    }
158
159    /// Blocks until the sender sends something, and then returns that value.
160    #[track_caller]
161    pub fn recv(&mut self) -> T {
162        self.ch.write_semaphore.decrement();
163        self.recv_impl()
164            .expect("send should've been called before this receive")
165    }
166
167    fn recv_impl(&mut self) -> Option<T> {
168        if self.ch.queue.len() <= 1 {
169            // This channel does not have any capacity, nothing to receive.
170            return None;
171        }
172
173        // 1. Acquire-load the write offset, so we know the offset is either
174        //    what we get here or something higher (if the other side `push`es
175        //    during this function). In either case, we're good as long as we
176        //    only read elements before this offset. Also, if we're really
177        //    racing against the writes, this should ensure that their write to
178        //    the slot before this one (the "freshest" slot we might read) is
179        //    visible to us as well, since they store this value with release
180        //    ordering.
181        let write_offset = self.ch.write_offset.load(Ordering::Acquire);
182
183        // 2. Since this is a single-consumer channel (and thus we have a mut
184        //    self), we can acquire-load here and release-store later in this
185        //    function and rest assured that the value of read_offset does not
186        //    change in between.
187        let read_offset = self.ch.read_offset.load(Ordering::Acquire);
188
189        // When the offsets match, the queue is considered empty. Otherwise the
190        // read_offset is known to be "before" write_offset (i.e. if it wasn't
191        // for the wrapping, it would be *less*), as read_offset only gets
192        // incremented if it's not equal to write_offset, and write_offset only
193        // gets incremented (if it weren't for the wrapping. But still, it's
194        // always "after" or equal to read_offset).
195        if read_offset == write_offset {
196            return None;
197        }
198
199        // 3. Read the value.
200        let value = {
201            let slot_ptr = self.ch.queue[read_offset].get();
202            // Safety: this specific index of the queue is not currently being
203            // written by the Sender nor read by Receiver:
204            // - We know the current write offset is the value of `write_offset`
205            //   or greater (which may have wrapped), and that `read_offset` is
206            //   before `write_offset`. Since the Sender checks that their write
207            //   offset does not write at the read offset, and we know the read
208            //   offset is the value of `read_offset` due to having a mutable
209            //   borrow of this Receiver, we know Sender isn't writing into
210            //   `read_offset`. This function allows writing into this index in
211            //   step 4, after we're done with this borrow.
212            // - We have a mutable borrow of the Receiver, and only one Receiver
213            //   exists for a given channel, so there's definitely no other
214            //   Receiver reading any index, including this one.
215            let slot = unsafe { &mut *slot_ptr };
216            slot.take()
217                .expect("slot should be populated due to the write offset having passed this index")
218        };
219
220        // 4. Update the read offset, making room for the sender to push one
221        //    more value into the queue. This also signals that the MaybeUninit
222        //    value we read before should be interpreted as uninitialized.
223        let next_read_offset = (read_offset + 1) % self.ch.queue.len();
224        self.ch
225            .read_offset
226            .store(next_read_offset, Ordering::Release);
227
228        Some(value)
229    }
230}
231
232/// FIXME: Use core::cell::SyncUnsafeCell instead when it's stabilized. Tracked
233/// in the rust-lang issue
234/// [#95439](https://github.com/rust-lang/rust/issues/95439).
235mod sync_unsafe_cell {
236    #![allow(missing_docs)]
237    use core::cell::UnsafeCell;
238    /// A [`Sync`] [`UnsafeCell`] when `T` is [`Sync`]. "Manually stabilized"
239    /// from rustc feature gate `sync_unsafe_cell`.
240    ///
241    /// Should be replaced with `core::cell::SyncUnsafeCell` instead when it's
242    /// stabilized. Tracked in the rust-lang issue
243    /// [#95439](https://github.com/rust-lang/rust/issues/95439). The open
244    /// questions don't seem to imply any issues with the type as it's
245    /// implemented here (this type requires `T` to be [`Sync`], and the naming
246    /// question is irrelevant for safety).
247    #[repr(transparent)]
248    pub struct SyncUnsafeCell<T: ?Sized>(UnsafeCell<T>);
249    unsafe impl<T: ?Sized + Sync> Sync for SyncUnsafeCell<T> {}
250    impl<T> SyncUnsafeCell<T> {
251        #[inline]
252        pub const fn new(value: T) -> Self {
253            SyncUnsafeCell(UnsafeCell::new(value))
254        }
255        #[inline]
256        pub const fn get(&self) -> *mut T {
257            self.0.get()
258        }
259    }
260}
261
262/// Allocates the memory for a channel with the given capacity using `alloc` and
263/// leaks the memory to create a channel.
264///
265/// This is just for tests.
266#[doc(hidden)]
267#[cfg(test)]
268pub fn leak_channel<T: Sync>(capacity: usize) -> (Sender<T>, Receiver<T>) {
269    extern crate alloc;
270    use alloc::boxed::Box;
271    use alloc::vec::Vec;
272
273    let mut queue_vec = Vec::with_capacity(capacity + 1);
274    for _ in 0..capacity + 1 {
275        queue_vec.push(SyncUnsafeCell::new(CachePadded::new(None)));
276    }
277    let queue = Box::leak(queue_vec.into_boxed_slice());
278    assert_eq!(capacity + 1, queue.len());
279    let read_offset = Box::leak(Box::new(AtomicUsize::new(0)));
280    let write_offset = Box::leak(Box::new(AtomicUsize::new(0)));
281    let write_semaphore = Box::leak(Box::new(Semaphore::single_threaded()));
282
283    channel_from_parts(queue, write_offset, read_offset, write_semaphore)
284}
285
286#[cfg(test)]
287mod tests {
288    use crate::channel::leak_channel;
289
290    /// A dummy function that matches the signature of `std::thread::spawn`.
291    fn spawn<F, T>(f: F)
292    where
293        F: FnOnce() -> T + Send + 'static,
294        T: Send + 'static,
295    {
296        f();
297    }
298
299    #[test]
300    fn sender_and_receiver_are_send() {
301        let (mut tx, mut rx) = leak_channel::<u32>(1);
302        spawn(move || tx.send(123).unwrap());
303        assert_eq!(123, rx.recv());
304    }
305
306    #[test]
307    fn waiting_recv_works() {
308        let (mut tx, mut rx) = leak_channel::<u32>(1);
309        tx.send(123).unwrap();
310        assert_eq!(123, rx.recv());
311    }
312
313    #[test]
314    #[should_panic]
315    fn panics_on_single_threaded_recv_without_a_preceding_send() {
316        let (_, mut rx) = leak_channel::<u32>(1);
317        // Should panic due to SingleThreadedSemaphore being a no-op impl of
318        // Semaphore, and there being no matching send() call.
319        let _ = rx.recv();
320    }
321
322    #[test]
323    fn handles_full_queue() {
324        const CAP: usize = 6;
325        let (mut tx, mut rx) = leak_channel::<usize>(CAP);
326        for i in 0..CAP {
327            tx.send(123 + i).unwrap();
328        }
329        assert_eq!(Err(123), tx.send(123));
330        for i in 0..CAP {
331            assert_eq!(123 + i, rx.recv());
332        }
333    }
334
335    #[test]
336    fn wraps_around() {
337        let (mut tx, mut rx) = leak_channel::<u32>(2);
338
339        tx.send(12).unwrap();
340        assert_eq!(12, rx.recv());
341        tx.send(34).unwrap();
342        assert_eq!(34, rx.recv());
343        tx.send(56).unwrap();
344        assert_eq!(56, rx.recv());
345        tx.send(78).unwrap();
346        assert_eq!(78, rx.recv());
347
348        tx.send(21).unwrap();
349        tx.send(43).unwrap();
350        assert_eq!(21, rx.recv());
351        assert_eq!(43, rx.recv());
352    }
353}