platform/channel.rs
1// SPDX-FileCopyrightText: 2025 Jens Pitkänen <jens.pitkanen@helsinki.fi>
2//
3// SPDX-License-Identifier: GPL-3.0-or-later
4
5//! Static memory based single-producer single-consumer channel for
6//! communication between threads.
7
8use core::sync::atomic::{AtomicUsize, Ordering};
9
10pub use crossbeam_utils::CachePadded;
11pub use sync_unsafe_cell::SyncUnsafeCell;
12
13use crate::Semaphore;
14
15struct SharedChannelState<T: 'static + Sync> {
16 /// The slice containing the actual elements.
17 queue: &'static [SyncUnsafeCell<CachePadded<Option<T>>>],
18 /// The index to `queue` where the oldest pushed element is. Only mutated by
19 /// [`Receiver::try_recv`]. If `read_offset == write_offset`, the queue
20 /// should be considered empty.
21 read_offset: &'static AtomicUsize,
22 /// The index to `queue` where the next element is pushed. Only mutated by
23 /// [`Sender::send`]. If `write_offset + 1 == read_offset`, the queue is
24 /// considered full. Since writes need to happen before reads, this happens
25 /// when the writes wrap around to the start and reach the read offset.
26 write_offset: &'static AtomicUsize,
27 /// Incremented on every [`Sender::send`], decrement on in every
28 /// [`Receiver::recv`] and [`Receiver::try_recv`].
29 write_semaphore: &'static Semaphore,
30}
31
32/// Return type of [`channel_from_parts`].
33pub type Channel<T> = (Sender<T>, Receiver<T>);
34
35/// Creates a new channel from its raw parts.
36///
37/// The queue should be one longer than the actual bound of the queue, so a
38/// channel with room for 3 buffered elements should have a slice of length 4 as
39/// the `queue`.
40pub fn channel_from_parts<T: Sync>(
41 queue: &'static mut [SyncUnsafeCell<CachePadded<Option<T>>>],
42 write_offset: &'static mut AtomicUsize,
43 read_offset: &'static mut AtomicUsize,
44 write_semaphore: &'static mut Semaphore,
45) -> Channel<T> {
46 read_offset.store(0, Ordering::Release);
47 write_offset.store(0, Ordering::Release);
48 let sender = Sender {
49 ch: SharedChannelState {
50 queue,
51 read_offset,
52 write_offset,
53 write_semaphore,
54 },
55 };
56 let receiver = Receiver {
57 ch: SharedChannelState {
58 queue,
59 read_offset,
60 write_offset,
61 write_semaphore,
62 },
63 };
64 (sender, receiver)
65}
66
67/// One endpoint of a channel, which can be used to send [`Sync`] and `'static`
68/// values to another thread.
69pub struct Sender<T: 'static + Sync> {
70 ch: SharedChannelState<T>,
71}
72
73impl<T: Sync> Sender<T> {
74 /// Returns how many tasks could be sent to be buffered up before the other
75 /// side receives them.
76 pub fn capacity(&self) -> usize {
77 self.ch.queue.len() - 1
78 }
79
80 /// Sends the value into the channel if there's room.
81 pub fn send(&mut self, value: T) -> Result<(), T> {
82 if self.ch.queue.len() <= 1 {
83 // This channel does not have any capacity, always fail.
84 return Err(value);
85 }
86
87 // 1. Acquire-load the read offset, so we know the offset is either what
88 // we get here or something higher (if the other side `pop`s during
89 // this function). In either case, we're good as long as we don't
90 // write past the offset we read here.
91 let read_offset = self.ch.read_offset.load(Ordering::Acquire);
92
93 // 2. Since this is a single-producer channel (and thus we have a mut
94 // self), we can acquire-load here and release-store later in this
95 // function and rest assured that the value of write_offset does not
96 // change in between.
97 let write_offset = self.ch.write_offset.load(Ordering::Acquire);
98
99 let next_write_offset = (write_offset + 1) % self.ch.queue.len();
100
101 // See if the queue is full. Note that the comparison isn't ">=" because
102 // that would cause issues with wrapping. This is still equivalent to
103 // ">=" if we didn't wrap (which is what we want), because:
104 // 1. The read offset is either the value stored in read_offset or
105 // greater. This means that the value of read_offset never goes
106 // *down* between push_back calls.
107 // 2. The write offset is always only incremented by 1 per push_back.
108 // => next_write_offset must reach read_offset before becoming greater
109 // than it. Since write_offset is not incremented in this branch,
110 // next_write_offset will never actually go past read_offset.
111 if next_write_offset == read_offset {
112 // The queue is full.
113 return Err(value);
114 }
115
116 // 3. Write the value.
117 {
118 let slot_ptr = self.ch.queue[write_offset].get();
119 // Safety: this specific index of the queue is not currently being
120 // read by the Receiver nor written by the Sender:
121 // - The Receiver does not read if read_offset == write_offset, and
122 // since only Sender mutates write_offset and we have a mutable
123 // borrow of self, we know Receiver is definitely not reading from
124 // this index at this time. (After step 4 below, it can read this
125 // value. Note that we drop this borrow before that step.)
126 // - We have a mutable borrow of this Sender, and there can only be
127 // one Sender per write_offset, so we're definitely the only
128 // Sender trying to access this queue.
129 let slot = unsafe { &mut *slot_ptr };
130 assert!(slot.is_none(), "slot should not be populated since the write offset should never go past the read offset");
131 *slot = CachePadded::new(Some(value));
132 }
133
134 // 4. Update the write offset, making the written value visible to the
135 // receiver.
136 self.ch
137 .write_offset
138 .store(next_write_offset, Ordering::Release);
139
140 self.ch.write_semaphore.increment();
141
142 Ok(())
143 }
144}
145
146/// One endpoint of a channel, which can be used to receive [`Sync`] and
147/// `'static` values from another thread.
148pub struct Receiver<T: 'static + Sync> {
149 ch: SharedChannelState<T>,
150}
151
152impl<T: Sync> Receiver<T> {
153 /// Returns how many tasks could be buffered up to be received without any
154 /// recv calls.
155 pub fn capacity(&self) -> usize {
156 self.ch.queue.len() - 1
157 }
158
159 /// Blocks until the sender sends something, and then returns that value.
160 #[track_caller]
161 pub fn recv(&mut self) -> T {
162 self.ch.write_semaphore.decrement();
163 self.recv_impl()
164 .expect("send should've been called before this receive")
165 }
166
167 fn recv_impl(&mut self) -> Option<T> {
168 if self.ch.queue.len() <= 1 {
169 // This channel does not have any capacity, nothing to receive.
170 return None;
171 }
172
173 // 1. Acquire-load the write offset, so we know the offset is either
174 // what we get here or something higher (if the other side `push`es
175 // during this function). In either case, we're good as long as we
176 // only read elements before this offset. Also, if we're really
177 // racing against the writes, this should ensure that their write to
178 // the slot before this one (the "freshest" slot we might read) is
179 // visible to us as well, since they store this value with release
180 // ordering.
181 let write_offset = self.ch.write_offset.load(Ordering::Acquire);
182
183 // 2. Since this is a single-consumer channel (and thus we have a mut
184 // self), we can acquire-load here and release-store later in this
185 // function and rest assured that the value of read_offset does not
186 // change in between.
187 let read_offset = self.ch.read_offset.load(Ordering::Acquire);
188
189 // When the offsets match, the queue is considered empty. Otherwise the
190 // read_offset is known to be "before" write_offset (i.e. if it wasn't
191 // for the wrapping, it would be *less*), as read_offset only gets
192 // incremented if it's not equal to write_offset, and write_offset only
193 // gets incremented (if it weren't for the wrapping. But still, it's
194 // always "after" or equal to read_offset).
195 if read_offset == write_offset {
196 return None;
197 }
198
199 // 3. Read the value.
200 let value = {
201 let slot_ptr = self.ch.queue[read_offset].get();
202 // Safety: this specific index of the queue is not currently being
203 // written by the Sender nor read by Receiver:
204 // - We know the current write offset is the value of `write_offset`
205 // or greater (which may have wrapped), and that `read_offset` is
206 // before `write_offset`. Since the Sender checks that their write
207 // offset does not write at the read offset, and we know the read
208 // offset is the value of `read_offset` due to having a mutable
209 // borrow of this Receiver, we know Sender isn't writing into
210 // `read_offset`. This function allows writing into this index in
211 // step 4, after we're done with this borrow.
212 // - We have a mutable borrow of the Receiver, and only one Receiver
213 // exists for a given channel, so there's definitely no other
214 // Receiver reading any index, including this one.
215 let slot = unsafe { &mut *slot_ptr };
216 slot.take()
217 .expect("slot should be populated due to the write offset having passed this index")
218 };
219
220 // 4. Update the read offset, making room for the sender to push one
221 // more value into the queue. This also signals that the MaybeUninit
222 // value we read before should be interpreted as uninitialized.
223 let next_read_offset = (read_offset + 1) % self.ch.queue.len();
224 self.ch
225 .read_offset
226 .store(next_read_offset, Ordering::Release);
227
228 Some(value)
229 }
230}
231
232/// FIXME: Use core::cell::SyncUnsafeCell instead when it's stabilized. Tracked
233/// in the rust-lang issue
234/// [#95439](https://github.com/rust-lang/rust/issues/95439).
235mod sync_unsafe_cell {
236 #![allow(missing_docs)]
237 use core::cell::UnsafeCell;
238 /// A [`Sync`] [`UnsafeCell`] when `T` is [`Sync`]. "Manually stabilized"
239 /// from rustc feature gate `sync_unsafe_cell`.
240 ///
241 /// Should be replaced with `core::cell::SyncUnsafeCell` instead when it's
242 /// stabilized. Tracked in the rust-lang issue
243 /// [#95439](https://github.com/rust-lang/rust/issues/95439). The open
244 /// questions don't seem to imply any issues with the type as it's
245 /// implemented here (this type requires `T` to be [`Sync`], and the naming
246 /// question is irrelevant for safety).
247 #[repr(transparent)]
248 pub struct SyncUnsafeCell<T: ?Sized>(UnsafeCell<T>);
249 unsafe impl<T: ?Sized + Sync> Sync for SyncUnsafeCell<T> {}
250 impl<T> SyncUnsafeCell<T> {
251 #[inline]
252 pub const fn new(value: T) -> Self {
253 SyncUnsafeCell(UnsafeCell::new(value))
254 }
255 #[inline]
256 pub const fn get(&self) -> *mut T {
257 self.0.get()
258 }
259 }
260}
261
262/// Allocates the memory for a channel with the given capacity using `alloc` and
263/// leaks the memory to create a channel.
264///
265/// This is just for tests.
266#[doc(hidden)]
267#[cfg(test)]
268pub fn leak_channel<T: Sync>(capacity: usize) -> (Sender<T>, Receiver<T>) {
269 extern crate alloc;
270 use alloc::boxed::Box;
271 use alloc::vec::Vec;
272
273 let mut queue_vec = Vec::with_capacity(capacity + 1);
274 for _ in 0..capacity + 1 {
275 queue_vec.push(SyncUnsafeCell::new(CachePadded::new(None)));
276 }
277 let queue = Box::leak(queue_vec.into_boxed_slice());
278 assert_eq!(capacity + 1, queue.len());
279 let read_offset = Box::leak(Box::new(AtomicUsize::new(0)));
280 let write_offset = Box::leak(Box::new(AtomicUsize::new(0)));
281 let write_semaphore = Box::leak(Box::new(Semaphore::single_threaded()));
282
283 channel_from_parts(queue, write_offset, read_offset, write_semaphore)
284}
285
286#[cfg(test)]
287mod tests {
288 use crate::channel::leak_channel;
289
290 /// A dummy function that matches the signature of `std::thread::spawn`.
291 fn spawn<F, T>(f: F)
292 where
293 F: FnOnce() -> T + Send + 'static,
294 T: Send + 'static,
295 {
296 f();
297 }
298
299 #[test]
300 fn sender_and_receiver_are_send() {
301 let (mut tx, mut rx) = leak_channel::<u32>(1);
302 spawn(move || tx.send(123).unwrap());
303 assert_eq!(123, rx.recv());
304 }
305
306 #[test]
307 fn waiting_recv_works() {
308 let (mut tx, mut rx) = leak_channel::<u32>(1);
309 tx.send(123).unwrap();
310 assert_eq!(123, rx.recv());
311 }
312
313 #[test]
314 #[should_panic]
315 fn panics_on_single_threaded_recv_without_a_preceding_send() {
316 let (_, mut rx) = leak_channel::<u32>(1);
317 // Should panic due to SingleThreadedSemaphore being a no-op impl of
318 // Semaphore, and there being no matching send() call.
319 let _ = rx.recv();
320 }
321
322 #[test]
323 fn handles_full_queue() {
324 const CAP: usize = 6;
325 let (mut tx, mut rx) = leak_channel::<usize>(CAP);
326 for i in 0..CAP {
327 tx.send(123 + i).unwrap();
328 }
329 assert_eq!(Err(123), tx.send(123));
330 for i in 0..CAP {
331 assert_eq!(123 + i, rx.recv());
332 }
333 }
334
335 #[test]
336 fn wraps_around() {
337 let (mut tx, mut rx) = leak_channel::<u32>(2);
338
339 tx.send(12).unwrap();
340 assert_eq!(12, rx.recv());
341 tx.send(34).unwrap();
342 assert_eq!(34, rx.recv());
343 tx.send(56).unwrap();
344 assert_eq!(56, rx.recv());
345 tx.send(78).unwrap();
346 assert_eq!(78, rx.recv());
347
348 tx.send(21).unwrap();
349 tx.send(43).unwrap();
350 assert_eq!(21, rx.recv());
351 assert_eq!(43, rx.recv());
352 }
353}