1use crate::notify::{GenericNotify, Internal, Notification};
6use crate::sync::atomic::Ordering;
7use crate::sync::cell::{Cell, UnsafeCell};
8use crate::sync::{Mutex, MutexGuard};
9use crate::{RegisterResult, State, TaskRef};
10
11use core::marker::PhantomPinned;
12use core::mem;
13use core::ops::{Deref, DerefMut};
14use core::pin::Pin;
15use core::ptr::NonNull;
16
17pub(super) struct List<T>(Mutex<Inner<T>>);
18
19struct Inner<T> {
20 head: Option<NonNull<Link<T>>>,
22
23 tail: Option<NonNull<Link<T>>>,
25
26 next: Option<NonNull<Link<T>>>,
28
29 len: usize,
31
32 notified: usize,
34}
35
36impl<T> List<T> {
37 pub(super) fn new() -> Self {
39 Self(Mutex::new(Inner {
40 head: None,
41 tail: None,
42 next: None,
43 len: 0,
44 notified: 0,
45 }))
46 }
47
48 pub(crate) fn try_total_listeners(&self) -> Option<usize> {
50 self.0.try_lock().ok().map(|list| list.len)
51 }
52
53 pub(crate) fn total_listeners(&self) -> usize {
55 self.0.lock().unwrap_or_else(|e| e.into_inner()).len
56 }
57}
58
59impl<T> crate::Inner<T> {
60 fn lock(&self) -> ListLock<'_, '_, T> {
61 ListLock {
62 inner: self,
63 lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
64 }
65 }
66
67 pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
69 let mut inner = self.lock();
70
71 listener.as_mut().set(Some(Listener {
72 link: UnsafeCell::new(Link {
73 state: Cell::new(State::Created),
74 prev: Cell::new(inner.tail),
75 next: Cell::new(None),
76 }),
77 _pin: PhantomPinned,
78 }));
79 let listener = listener.as_pin_mut().unwrap();
80
81 {
82 let entry_guard = listener.link.get();
83 let entry = unsafe { entry_guard.deref() };
85
86 match mem::replace(&mut inner.tail, Some(entry.into())) {
88 None => inner.head = Some(entry.into()),
89 Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
90 };
91 }
92
93 if inner.next.is_none() {
95 inner.next = inner.tail;
96 }
97
98 inner.len += 1;
100 }
101
102 pub(crate) fn remove(
104 &self,
105 listener: Pin<&mut Option<Listener<T>>>,
106 propagate: bool,
107 ) -> Option<State<T>> {
108 self.lock().remove(listener, propagate)
109 }
110
111 #[cold]
113 pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
114 self.lock().notify(notify)
115 }
116
117 pub(crate) fn register(
122 &self,
123 mut listener: Pin<&mut Option<Listener<T>>>,
124 task: TaskRef<'_>,
125 ) -> RegisterResult<T> {
126 let mut inner = self.lock();
127 let entry_guard = match listener.as_mut().as_pin_mut() {
128 Some(listener) => listener.link.get(),
129 None => return RegisterResult::NeverInserted,
130 };
131 let entry = unsafe { entry_guard.deref() };
133
134 match entry.state.replace(State::NotifiedTaken) {
136 State::Notified { tag, .. } => {
137 inner.remove(listener, false);
139 RegisterResult::Notified(tag)
140 }
141
142 State::Task(other_task) => {
143 entry.state.set(State::Task({
145 if !task.will_wake(other_task.as_task_ref()) {
146 task.into_task()
147 } else {
148 other_task
149 }
150 }));
151
152 RegisterResult::Registered
153 }
154
155 _ => {
156 entry.state.set(State::Task(task.into_task()));
158 RegisterResult::Registered
159 }
160 }
161 }
162}
163
164impl<T> Inner<T> {
165 fn remove(
166 &mut self,
167 mut listener: Pin<&mut Option<Listener<T>>>,
168 propagate: bool,
169 ) -> Option<State<T>> {
170 let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
171 let entry = unsafe { entry_guard.deref() };
172
173 let prev = entry.prev.get();
174 let next = entry.next.get();
175
176 match prev {
178 None => self.head = next,
179 Some(p) => unsafe {
180 p.as_ref().next.set(next);
181 },
182 }
183
184 match next {
186 None => self.tail = prev,
187 Some(n) => unsafe {
188 n.as_ref().prev.set(prev);
189 },
190 }
191
192 if self.next == Some(entry.into()) {
194 self.next = next;
195 }
196
197 let entry = unsafe {
199 listener
200 .get_unchecked_mut()
201 .take()
202 .unwrap()
203 .link
204 .into_inner()
205 };
206
207 let mut state = entry.state.replace(State::Created);
212
213 if state.is_notified() {
215 self.notified -= 1;
216
217 if propagate {
218 let state = mem::replace(&mut state, State::NotifiedTaken);
219 if let State::Notified { additional, tag } = state {
220 let tags = {
221 let mut tag = Some(tag);
222 move || tag.take().expect("tag already taken")
223 };
224 self.notify(GenericNotify::new(1, additional, tags));
225 }
226 }
227 }
228 self.len -= 1;
229
230 Some(state)
231 }
232
233 #[cold]
234 fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
235 let mut n = notify.count(Internal::new());
236 let is_additional = notify.is_additional(Internal::new());
237
238 if !is_additional {
239 if n < self.notified {
240 return 0;
241 }
242 n -= self.notified;
243 }
244
245 let original_count = n;
246 while n > 0 {
247 n -= 1;
248
249 match self.next {
251 None => return original_count - n - 1,
252
253 Some(e) => {
254 let entry = unsafe { e.as_ref() };
256 self.next = entry.next.get();
257
258 let tag = notify.next_tag(Internal::new());
260 if let State::Task(task) = entry.state.replace(State::Notified {
261 additional: is_additional,
262 tag,
263 }) {
264 task.wake();
265 }
266
267 self.notified += 1;
269 }
270 }
271 }
272
273 original_count - n
274 }
275}
276
277struct ListLock<'a, 'b, T> {
278 lock: MutexGuard<'a, Inner<T>>,
279 inner: &'b crate::Inner<T>,
280}
281
282impl<T> Deref for ListLock<'_, '_, T> {
283 type Target = Inner<T>;
284
285 fn deref(&self) -> &Self::Target {
286 &self.lock
287 }
288}
289
290impl<T> DerefMut for ListLock<'_, '_, T> {
291 fn deref_mut(&mut self) -> &mut Self::Target {
292 &mut self.lock
293 }
294}
295
296impl<T> Drop for ListLock<'_, '_, T> {
297 fn drop(&mut self) {
298 let list = &mut **self;
299
300 let notified = if list.notified < list.len {
302 list.notified
303 } else {
304 usize::MAX
305 };
306
307 self.inner.notified.store(notified, Ordering::Release);
308 }
309}
310
311pub(crate) struct Listener<T> {
312 link: UnsafeCell<Link<T>>,
318
319 _pin: PhantomPinned,
321}
322
323struct Link<T> {
324 state: Cell<State<T>>,
326
327 prev: Cell<Option<NonNull<Link<T>>>>,
329
330 next: Cell<Option<NonNull<Link<T>>>>,
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use futures_lite::pin;
338
339 #[cfg(target_family = "wasm")]
340 use wasm_bindgen_test::wasm_bindgen_test as test;
341
342 macro_rules! make_listeners {
343 ($($id:ident),*) => {
344 $(
345 let $id = Option::<Listener<()>>::None;
346 pin!($id);
347 )*
348 };
349 }
350
351 #[test]
352 fn insert() {
353 let inner = crate::Inner::new();
354 make_listeners!(listen1, listen2, listen3);
355
356 inner.insert(listen1.as_mut());
358 inner.insert(listen2.as_mut());
359 inner.insert(listen3.as_mut());
360
361 assert_eq!(inner.lock().len, 3);
362
363 assert_eq!(inner.remove(listen2, false), Some(State::Created));
365 assert_eq!(inner.lock().len, 2);
366
367 assert_eq!(inner.remove(listen1, false), Some(State::Created));
369 assert_eq!(inner.lock().len, 1);
370 }
371
372 #[test]
373 fn drop_non_notified() {
374 let inner = crate::Inner::new();
375 make_listeners!(listen1, listen2, listen3);
376
377 inner.insert(listen1.as_mut());
379 inner.insert(listen2.as_mut());
380 inner.insert(listen3.as_mut());
381
382 inner.notify(GenericNotify::new(1, false, || ()));
384
385 inner.remove(listen3, true);
387
388 inner.remove(listen1, true);
390 inner.remove(listen2, true);
391 }
392}